diff --git a/CODEOWNERS b/CODEOWNERS index bba5f83..857800c 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -7,3 +7,8 @@ # When there are changes in the GUI folder, @GiacomoValliPhD # and @PaulRitsche will be requested for review. /openhdemg/gui/ @GiacomoValliPhD @PaulRitsche + +# When there are changes in the pic module, @GiacomoValliPhD +# and @JABeauchamp will be requested for review. +/openhdemg/library/pic.py @GiacomoValliPhD @JABeauchamp +/openhdemg/library/tools.py @GiacomoValliPhD @JABeauchamp diff --git a/docs/api_analysis.md b/docs/api_analysis.md index efe84d9..6371c3a 100644 --- a/docs/api_analysis.md +++ b/docs/api_analysis.md @@ -1,6 +1,9 @@ Description ----------- -This module contains all the functions used to analyse the MUs properties when not involving the MUs action potential shape. +This module contains all the functions used to analyse the MUs properties apart from those involving: + +- The MUs action potential shape => Available in the [muap module](api_muap.md) +- DeltaF / PICs estimation => available in the [pic module](api_pic.md)
diff --git a/docs/api_muap.md b/docs/api_muap.md index b7e8fb3..5a1707d 100644 --- a/docs/api_muap.md +++ b/docs/api_muap.md @@ -68,6 +68,13 @@ This module contains functions to produce and analyse MUs anction potentials
+::: openhdemg.library.muap.Tracking_gui + options: + show_root_full_path: False + show_root_heading: True + +
+ ::: openhdemg.library.muap.remove_duplicates_between options: show_root_full_path: False diff --git a/docs/api_pic.md b/docs/api_pic.md new file mode 100644 index 0000000..d2bdfd8 --- /dev/null +++ b/docs/api_pic.md @@ -0,0 +1,14 @@ +Description +----------- +This module contains all the functions used to quantify and analyze MU persistent inward currents. + +Currently includes delta F. + +
+ +::: openhdemg.library.pic.compute_deltaf + options: + show_root_full_path: False + show_root_heading: True + +
\ No newline at end of file diff --git a/docs/api_plotemg.md b/docs/api_plotemg.md index dc815d4..8da6229 100644 --- a/docs/api_plotemg.md +++ b/docs/api_plotemg.md @@ -53,6 +53,13 @@ This module contains all the functions used to visualise the content of the impo
+::: openhdemg.library.plotemg.plot_smoothed_dr + options: + show_root_full_path: False + show_root_heading: True + +
+ ::: openhdemg.library.plotemg.plot_muaps options: show_root_full_path: False diff --git a/docs/api_tools.md b/docs/api_tools.md index f65b7bd..b421ad0 100644 --- a/docs/api_tools.md +++ b/docs/api_tools.md @@ -104,3 +104,10 @@ shortcuts necessary to operate with the HD-EMG recordings. show_root_heading: True
+ +::: openhdemg.library.tools.compute_svr + options: + show_root_full_path: False + show_root_heading: True + +
diff --git a/docs/gui_advanced.md b/docs/gui_advanced.md index c2999b7..4a974ed 100644 --- a/docs/gui_advanced.md +++ b/docs/gui_advanced.md @@ -8,13 +8,14 @@ Please note, the `Advanced Tools` might not be available for all the files, as s ## Start a Specific Tool -So far, we have included three advanced analyses in the *openhdemg* GUI. +So far, we have included four advanced analyses in the *openhdemg* GUI. - `Motor Unit Tracking` - `Duplicate Removal` - `Conduction Velocity Estimation` +- `Persistent Inward Currents` -For all of those, the specification of a `Matrix Code` and a `Matrix Orientation` is required. +For some of these analyses, the specification of a `Matrix Code` and a `Matrix Orientation` is required. The `Matrix Code` must be specified according to the one you used during acquisition. So far, the implemented codes are: @@ -33,6 +34,8 @@ Rows, Columns: 13, 5 ``` means that your File has 65 channels organised over 13 rows and 5 columns. +The use of `None` is suggested anytime your loaded file contains already sorted EMG channels and you want to avoid further sorting. + If you selected one of the built-in sorting orders (e.g., `GR08MM1305`, `GR04MM1305`, `GR10MM0808`), you need to specify also the `Orientation` in row two and column four in the left side of the `Plot Window`. The `Orientaion` must match the one of your matrix during acquisition. You can find a reference image for the `Orientation` at the bottom in the right side of the `Plot Window`. `Orientation` is ignored when `Matrix Code` is `None` or `Custom order`. Once you specified these parameter, you can click the `Advaned Analysis` button to start your analysis. @@ -66,7 +69,7 @@ When you want to track MUs across two different files, you need to select the `M 7. The `Show` checkbox indicates whether to plot the spike triggered average of pairs of MUs with cross-correlation above `Threshold`. -8. By clicking the `Track` button, you can start the analysis. The tracking results will be displayed in the `MUs Tracking Resul` output in the right side of the `MUs Tracking Window`. +8. By clicking the `Track` button, you can start the analysis. The tracking results will be displayed in the `MUs Tracking Result` output in the right side of the `MUs Tracking Window`. ## Duplicate Removal @@ -111,6 +114,24 @@ Prior to calculation of the `Conduction Velocity` you need to load a file in the We are now at the end of describing the advanced functions included in the *openhdemg* GUI. If you want to take a look at more basic stuff, check out the [basic](gui_basics.md). +## Persistent Inward Currents + +When you want to estimate Persistent Inward Currents, you need to select the `Persistent Inward Currents` option. In this case, `Matrix Code` and `Matrix Orentation` will be locked, as these are not needed for the analysis. Once you clicked the `Advanced Analysis` button, the `Persistent Inward Currents Window` will pop-up. + +![persistent_inward_currents](md_graphics/gui/pics_window.png) + +Please note that it is suggested to [sort the MUs](gui_basics.md/#motor-unit-sorting) based on recruitment order before performing the PICs estimation. This can be done in the main GUI window. + +1. In row 1 of this window you can select the smoothing tecnique to adopt (at the time of writing, only the `Support Vector Regression` is available, but more are under development). + +2. Select the `Average Method` to use for test MU deltaF value. + +3. Select the method for deltaF `Normalisation`. + +4. Select the `Clean` option if you want to remove values that do not meet exclusion criteria. + +5. By clicking the `Compute PIC` button, you can start the analysis. The results will be displayed in the right side of the `Persistent Inward Currents Window`. + ## More questions? We hope that this tutorial was useful. If you need any additional information, do not hesitate to read the answers or ask a question in the [*openhdemg* discussion section](https://github.com/GiacomoValliPhD/openhdemg/discussions){:target="_blank"}. If you are not familiar with GitHub discussions, please read this [post](https://github.com/GiacomoValliPhD/openhdemg/discussions/42){:target="_blank"}. This will allow the *openhdemg* community to answer your questions. diff --git a/docs/gui_settings.md b/docs/gui_settings.md index ec04841..17a872a 100644 --- a/docs/gui_settings.md +++ b/docs/gui_settings.md @@ -8,6 +8,10 @@ Accessing these GUI settings is simple: just click on the Gear icon located at t Upon clicking the settings icon, a Python file will open in your default text editor. Here, you'll be able to modify the settings according to your requirements. +Always remember to **save the Python settings file after changing the variables**, or the changes will not be effective. + +If the Python file does not open, please read the [Troubleshooting section](#troubleshooting). + ## Modify GUI settings The GUI settings file (named *settings.py*) is organised into distinct topic sections. @@ -18,7 +22,7 @@ All the variables that can be modified are labelled as: In this way, the values that each variable can assume can be discovered from the API section of this website by navigating into the topic, then into the Name of the function and then looking at the specific variable. -The variables that can be modified from the GUI settings, as well as their values, can differ between different *openhdemg* releases. Therefore, the user is always encouraged to check the specifics APIs, and not to rely on this guide, which only serves didactical purposes. +**The variables that can be modified from the GUI settings, as well as their values, can differ between different *openhdemg* releases. Therefore, the user is always encouraged to check the specifics APIs, and not to rely on this guide, which only serves didactical purposes.** ### openfiles @@ -89,6 +93,22 @@ resize_emgfile__ignore_negative_ipts = False The corresponding APIs for the function *resize_emgfile* can be accessed [here](api_tools.md/#openhdemg.library.tools.resize_emgfile). +### pic + +The section named *pic* controls the Persistent Inward Currents estimation. + +The API section for this topic is [here](api_pic.md). + +For example, in the following code snippet we can adjust how to estimate Delta F. + +```Python +compute_deltaf__recruitment_difference_cutoff = 1.0 +compute_deltaf__corr_cutoff = 0.7 +compute_deltaf__controlunitmodulation_cutoff = 0.5 +``` + +The corresponding APIs for the function *compute_deltaf* can be accessed [here](api_pic.md/#openhdemg.library.pic.compute_deltaf). + ### muap The section named *muap* controls all the functionalities that require MU action potentials. Currently, it controls the behaviour of functionalities such as MU tracking, duplicate removal and conduction velocity estimation, among others. @@ -189,6 +209,44 @@ Additionally, you cannot remove the unused variables from the settings file! You If you accidentally modify some variables and the GUI stops working properly, you can restore the original settings by copying and pasting the content of the *backup_settings.py* file. This will be visible in the file explorer of your editor next to the *settings.py* file. +## Troubleshooting + +### The settings don't show up + +If clicking the Gear icon doesn't open the *settings.py* file, it might be because your operating system doesn't recognize the .py file extension. This can happen if you've never opened a Python file before. To solve this: + +1. Double-click on any Python file (do this from outside Visual Studio Code, or it will take care of the process and your operating system will not associate the Python file to a specific software). + +2. A window should prompt you to choose which software to use to open the file. + +3. Select Visual Studio Code and set it as the default application for .py files. + +4. After doing this, try restarting the *openhdemg* GUI and Visual Studio Code. The Gear icon should now function correctly. + +If the issue persists, please continue reading. + +### Locate the settings.py file + +If the *settings.py* file does not open after clicking on the Gear icon, you can still manually navigate to this file and change the settings. The *settings.py* file can be accessed navigating the file explorer of your editor (usually on the left side of Visual Studio Code). + +In your file explorer, navigate as follows: + +1. Click on your virtual environment folder + +2. Click on *Lib* + +3. Click on *openhdemg* + +4. Click on *gui* + +5. Here you will find the *settings.py* file. Double click it and edit the settings as needed. + +6. Save the file + +### Changes are not effective + +Always remember to save the Python settings file after changing the variables, or the changes will not be effective. It is not necessary to restart the GUI when changes are made to the settings.py file. + ## More questions? We hope that this tutorial was useful. If you need any additional information, do not hesitate to read the answers or ask a question in the [*openhdemg* discussion section](https://github.com/GiacomoValliPhD/openhdemg/discussions){:target="_blank"}. If you are not familiar with GitHub discussions, please read this [post](https://github.com/GiacomoValliPhD/openhdemg/discussions/42){:target="_blank"}. This will allow the *openhdemg* community to answer your questions. \ No newline at end of file diff --git a/docs/isek_jek_tutorials.md b/docs/isek_jek_tutorials.md index c69dec6..7095818 100644 --- a/docs/isek_jek_tutorials.md +++ b/docs/isek_jek_tutorials.md @@ -26,7 +26,7 @@ You can download the sample files and the sample scripts [here](https://drive.go
-## 2023 ISEK Workshop +## 2024 ISEK Workshop **Workshop: Simplified analysis of motor unit properties with *openhdemg*.** diff --git a/docs/md_graphics/gui/pics_window.png b/docs/md_graphics/gui/pics_window.png new file mode 100644 index 0000000..ff932d7 Binary files /dev/null and b/docs/md_graphics/gui/pics_window.png differ diff --git a/docs/overrides/main.html b/docs/overrides/main.html index 43eddac..d22b522 100644 --- a/docs/overrides/main.html +++ b/docs/overrides/main.html @@ -7,13 +7,9 @@ {% block announce %}
-

2 X 🎉 == Double News!

-

The 4th openhdemg beta release is now available! =>

- See what's new +

🚀 Release V0.1.0

+

The first openhdemg stable version is here! =>

+ See the new functionalities
-

We are thrilled to announce the publication of our Tutorial article! =>

- Read it Now -
-
{% endblock %} \ No newline at end of file diff --git a/docs/what's-new.md b/docs/what's-new.md index 4224d8e..4ccd32a 100644 --- a/docs/what's-new.md +++ b/docs/what's-new.md @@ -1,3 +1,47 @@ +## :octicons-tag-24: 0.1.0 +:octicons-clock-24: June 2024 + +This release is aimed at expanding the functionalities of the library with new and improved analysis tools. It also marks the exit from the beta phase! + +### Backward Compatibility + +This version is fully backward compatible with v0.1.0-beta.3 and 4. + +### Major Achievements +- **More functionalities** + +### Major Changes + +- **New modules**: + + - With this release, we introduce the module `pic`. A module dedicated to Persistent Inward Currents estimation. It now allows to estimate PICs via Delta F and will be enriched with new functionalities in the near future. + +- **New classes**: + + - The class `Tracking_gui` provides a convenient interface for the visual inspection of the tracking results, improving the flexibility and accuracy of MUs tracking based on MUAPs shape. + +- **Updated classes**: + + - The class `MUcv_gui` has been optimised to reduce RAM memory usage and can now be expanded to full-screen. Furthermore, it now accepts custom separators for copying the results and pasting them into Excel (or any other text/tabular document). + +- **New functions**: + + - The function `compute_svr` allows to fit MU discharge rates with Support Vector Regression, nonlinear regression. + - The function `compute_deltaf` allows to quantify delta F via paired motor unit analysis. + - The function `plot_smoothed_dr` allows to visualise the smoothed discharge rate, with or without IDR and with or without stacking MUs. + +- **Updated functions**: + + - The functions `tracking()` and `remove_duplicates_between()` now allow to directly show the `Tracking_gui` after completing the tracking procedure for additional inspection of the results. They also allow to select whether to use parallel processing to improve execution speed. + - The functions `plot_muaps` and `plot_muaps_for_cv` now allow to use a tight layout for the output figure. + - The functions `compute_dr`, `compute_drvariability`, `compute_covisi` and `basic_mus_properties` now allow to exclude firings outside a custom range of frequencies. + +- **GUI updates**: + + - The openhdemg GUI allows access to the `Tracking_gui` during MUs tracking (not after duplicates removal) and to perform PICs estimation via the advanced tools window. It also allows tuning the behaviour of these functionalities through the settings window. + +
+ ## :octicons-tag-24: 0.1.0-beta.4 :octicons-clock-24: April 2024 diff --git a/mkdocs.yml b/mkdocs.yml index b537729..750bd91 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -83,6 +83,7 @@ nav: - openfiles: api_openfiles.md - plotemg: api_plotemg.md - analysis: api_analysis.md + - pic: api_pic.md - tools: api_tools.md - mathtools: api_mathtools.md - muap: api_muap.md diff --git a/openhdemg/__init__.py b/openhdemg/__init__.py index 578cf8c..a05eb9a 100644 --- a/openhdemg/__init__.py +++ b/openhdemg/__init__.py @@ -1,3 +1,3 @@ __all__ = ["__version__"] -__version__ = "0.1.0-beta.4" +__version__ = "0.1.0" diff --git a/openhdemg/gui/backup_settings.py b/openhdemg/gui/backup_settings.py index fb335ea..b477037 100644 --- a/openhdemg/gui/backup_settings.py +++ b/openhdemg/gui/backup_settings.py @@ -19,30 +19,31 @@ import numpy as np + # --------------------------------- openfiles --------------------------------- -# in emg_from_demuse() +# In emg_from_demuse() emg_from_demuse__ignore_negative_ipts = False -# in emg_from_otb() +# In emg_from_otb() emg_from_otb__ext_factor = 8 emg_from_otb__refsig = [True, "fullsampled"] emg_from_otb__extras = None emg_from_otb__ignore_negative_ipts = False -# in refsig_from_otb() +# In refsig_from_otb() refsig_from_otb__refsig = "fullsampled" refsig_from_otb__extras = None -# in emg_from_delsys() +# In emg_from_delsys() emg_from_delsys__emg_sensor_name = "Galileo sensor" emg_from_delsys__refsig_sensor_name = "Trigno Load Cell" emg_from_delsys__filename_from = "mus_directory" -# in refsig_from_delsys() +# In refsig_from_delsys() refsig_from_delsys__refsig_sensor_name = "Trigno Load Cell" -# in emg_from_customcsv() +# In emg_from_customcsv() emg_from_customcsv__ref_signal = "REF_SIGNAL" emg_from_customcsv__raw_signal = "RAW_SIGNAL" emg_from_customcsv__ipts = "IPTS" @@ -53,48 +54,71 @@ emg_from_customcsv__fsamp = 2048 emg_from_customcsv__ied = 8 -# in refsig_from_customcsv() +# In refsig_from_customcsv() refsig_from_customcsv__ref_signal = "REF_SIGNAL" refsig_from_customcsv__extras = "EXTRAS" refsig_from_customcsv__fsamp = 2048 -# in save_json_emgfile() +# In save_json_emgfile() save_json_emgfile__compresslevel = 4 # ---------------------------------- analysis --------------------------------- -# in compute_thresholds() +# In compute_thresholds() compute_thresholds__n_firings = 1 -# in basic_mus_properties() +# In compute_dr() +compute_dr__idr_range = None + +# In basic_mus_properties() basic_mus_properties__n_firings_rt_dert = 1 basic_mus_properties__accuracy = "default" basic_mus_properties__ignore_negative_ipts = False basic_mus_properties__constrain_pulses = [True, 3] +basic_mus_properties__idr_range = None # ----------------------------------- tools ----------------------------------- -# in resize_emgfile() +# In resize_emgfile() resize_emgfile__how = "ref_signal" resize_emgfile__accuracy = "recalculate" resize_emgfile__ignore_negative_ipts = False +# In compute_svr() +compute_svr__gammain = 1/1.6 +compute_svr__regparam = 1/0.370 +compute_svr__endpointweights_numpulses = 5 +compute_svr__endpointweights_magnitude = 5 +compute_svr__discontfiring_dur = 1.0 + + +# ------------------------------------ pic ------------------------------------ +# In compute_deltaf() +compute_deltaf__recruitment_difference_cutoff = 1.0 +compute_deltaf__corr_cutoff = 0.7 +compute_deltaf__controlunitmodulation_cutoff = 0.5 + # ------------------------------------ muap ----------------------------------- -# in tracking() +# In tracking() tracking__firings = "all" tracking__derivation = "sd" +tracking__gui = True +tracking__gui_addrefsig = True +tracking__gui_csv_separator = "\t" -# in remove_duplicates_between() +# In remove_duplicates_between() remove_duplicates_between__firings = "all" remove_duplicates_between__derivation = "sd" -# in MUcv_gui() +# In MUcv_gui() MUcv_gui__n_firings = [0, 50] MUcv_gui__muaps_timewindow = 50 MUcv_gui__figsize = [25, 20] +MUcv_gui__csv_separator = "\t" + # --------------------------------- electrodes -------------------------------- # This custom sorting order is valid for all the GUI windows, although the diff --git a/openhdemg/gui/gui_modules/advanced_analyses.py b/openhdemg/gui/gui_modules/advanced_analyses.py index a4e95cc..2dab858 100644 --- a/openhdemg/gui/gui_modules/advanced_analyses.py +++ b/openhdemg/gui/gui_modules/advanced_analyses.py @@ -101,6 +101,10 @@ def __init__(self, parent): self.filter_adv = BooleanVar() self.show_adv = BooleanVar() self.which_adv = StringVar() + self.average_method_adv = StringVar() + self.smoothing_method_adv = StringVar() + self.normalisation_adv = StringVar() + self.clean_adv = StringVar() # TODO this may become a bool self.emgfile1 = {} self.emgfile2 = {} self.extension_factor_adv = StringVar() @@ -109,27 +113,6 @@ def __init__(self, parent): self.parent = parent self.parent.load_settings() - # Disable config for DELSYS files - try: - if self.parent.resdict["SOURCE"] == "DELSYS": - show_error_dialog( - parent=self, - error=None, - solution=str( - "Advanced Tools for Delsys are only accessible from " - + "the library." - ), - ) - return - except AttributeError: - pass # Allow to open advanced tools even if no file is loaded - except Exception as e: - show_error_dialog( - parent=self, error=e, - solution=str("An unexpected error occurred."), - ) - return - # Open window self.a_window = ctk.CTkToplevel(fg_color="LightBlue4") self.a_window.title("Advanced Tools Window") @@ -137,7 +120,8 @@ def __init__(self, parent): # Set window icon head_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) iconpath = head_path + "/gui_files/Icon_transp.ico" - self.a_window.iconbitmap(default=iconpath) + self.a_window.iconbitmap(iconpath) + if platform.startswith("win"): self.a_window.after(200, lambda: self.a_window.iconbitmap(iconpath)) @@ -170,14 +154,16 @@ def __init__(self, parent): "Motor Unit Tracking", "Duplicate Removal", "Conduction Velocity", + "Persistent Inward Currents", ) self.advanced_method = StringVar() adv_box = ctk.CTkComboBox( self.a_window, - width=200, + width=260, variable=self.advanced_method, values=adv_box_values, state="readonly", + command=self.enable_disable_a_window_elements ) adv_box.grid(row=2, column=1, sticky=(W, E)) self.advanced_method.set("Motor Unit Tracking") @@ -188,14 +174,14 @@ def __init__(self, parent): font=("Segoe UI", 18, "bold"), ).grid(row=3, column=0, sticky=(W, E)) self.mat_orientation_adv = StringVar() - orientation = ctk.CTkComboBox( + self.orientation_combobox = ctk.CTkComboBox( self.a_window, - width=100, + width=260, variable=self.mat_orientation_adv, values=("0", "180"), state="readonly", ) - orientation.grid(row=3, column=1, sticky=(W, E)) + self.orientation_combobox.grid(row=3, column=1, sticky=(W, E)) self.mat_orientation_adv.set("180") # Matrix code @@ -210,14 +196,14 @@ def __init__(self, parent): "GR04MM1305", "GR10MM0808", ) - matrix_code = ctk.CTkComboBox( + self.matrix_code_combobox = ctk.CTkComboBox( self.a_window, - width=150, + width=260, variable=self.mat_code_adv, values=matrix_code_vals, state="readonly", ) - matrix_code.grid(row=4, column=1, sticky=(W, E)) + self.matrix_code_combobox.grid(row=4, column=1, sticky=(W, E)) self.mat_code_adv.set("GR08MM1305") # Trace variable for updating window @@ -240,9 +226,38 @@ def __init__(self, parent): for child in self.a_window.winfo_children(): child.grid_configure(padx=5, pady=5) + # Check emgfile source and adjust available functionalities + try: + if self.parent.resdict["SOURCE"] == "DELSYS": + new_values = ("Persistent Inward Currents", ) + adv_box.configure(values=new_values) + self.advanced_method.set("Persistent Inward Currents") + self.enable_disable_a_window_elements(None) + except AttributeError: + pass # Allow to open advanced tools even if no file is loaded + except Exception as e: + show_error_dialog( + parent=self, error=e, + solution=str("An unexpected error occurred."), + ) + return + # ----------------------------------------------------------------------------------------------- # Advanced Analysis functionalities + def enable_disable_a_window_elements(self, _): + """ + Enable or disable comboboxes in a_window depending on the value of + self.advanced_method. + """ + + if self.advanced_method.get() == "Persistent Inward Currents": + self.orientation_combobox.configure(state="disabled") + self.matrix_code_combobox.configure(state="disabled") + else: + self.orientation_combobox.configure(state="readonly") + self.matrix_code_combobox.configure(state="readonly") + def on_matrix_none_adv(self, *args): """ Handle changes in the matrix code selection in the AdvancedAnalysis @@ -303,7 +318,8 @@ def advanced_analysis(self): depending on the advanced analysis option chosen by the user. It dynamically creates GUI elements like dropdowns, buttons, and checkboxes specific to the selected analysis tool, such as 'Motor Unit - Tracking', 'Conduction Velocity', or 'Duplicate Removal'. + Tracking', 'Conduction Velocity', 'Duplicate Removal' or 'Persistent + Inward Currents'. Raises ------ @@ -327,7 +343,7 @@ def advanced_analysis(self): # Set window icon head_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) iconpath = head_path + "/gui_files/Icon_transp.ico" - self.head.iconbitmap(default=iconpath) + self.head.iconbitmap(iconpath) if platform.startswith("win"): self.head.after(200, lambda: self.head.iconbitmap(iconpath)) @@ -343,7 +359,7 @@ def advanced_analysis(self): self.head.rowconfigure(row, weight=1) # Specify Signal - signal_value = ("OTB", "DEMUSE", "OPENHDEMG", "CUSTOMCSV") + signal_value = ("OPENHDEMG", "DEMUSE", "OTB", "CUSTOMCSV") signal_entry = ctk.CTkComboBox( self.head, width=150, @@ -471,6 +487,15 @@ def advanced_analysis(self): for child in self.head.winfo_children(): child.grid_configure(padx=5, pady=5) + # Add description for the show_checkbox for Motor Unit Tracking + if self.advanced_method.get() == "Motor Unit Tracking": + description_label = ctk.CTkLabel( + self.head, + text="Leave Show off if using \n the tracking GUI", + font=("Segoe UI", 10.5), + ) + description_label.grid(column=0, row=14, pady=(0, 4)) + # Add Which widget and update the track button # to match functionalities required for duplicate removal if self.advanced_method.get() == "Duplicate Removal": @@ -503,6 +528,9 @@ def advanced_analysis(self): self.head.destroy() self.a_window.destroy() + # Reload settings + self.parent.load_settings() + if self.mat_code_adv.get() == "None": # Get rows and columns and turn into list @@ -561,6 +589,7 @@ def advanced_analysis(self): n_firings=self.parent.settings.MUcv_gui__n_firings, muaps_timewindow=self.parent.settings.MUcv_gui__muaps_timewindow, figsize=self.parent.settings.MUcv_gui__figsize, + csv_separator=self.parent.settings.MUcv_gui__csv_separator, ) except AttributeError as e: @@ -585,8 +614,121 @@ def advanced_analysis(self): ) self.head.destroy() + # TODO better to do specific windows rather than modifying a default + if self.advanced_method.get() == "Persistent Inward Currents": + # Create appropriate widgets + for widget in self.head.winfo_children(): + widget.destroy() + + r = 0 + # Smoothing Method label + smoothing_method_label = ctk.CTkLabel( + self.head, text="Smoothing Method:", + font=("Segoe UI", 18, "bold"), + ) + smoothing_method_label.grid(column=0, row=r) + # Combobox for Smoothing Method + smoothing_method_combobox = ctk.CTkComboBox( + self.head, + values=("Support Vector Regression",), + variable=self.smoothing_method_adv, + state="disabled", + width=260, + ) + smoothing_method_combobox.grid(column=1, row=r) + self.smoothing_method_adv.set("Support Vector Regression") + + r += 1 + # Average Method label + average_method_label = ctk.CTkLabel( + self.head, text="Average Method:", + font=("Segoe UI", 18, "bold"), + ) + average_method_label.grid(column=0, row=r) + # Combobox for Average Method + average_method_combobox = ctk.CTkComboBox( + self.head, + values=("test_unit_average", "all"), + variable=self.average_method_adv, + state="readonly", + width=260, + ) + average_method_combobox.grid(column=1, row=r) + self.average_method_adv.set("test_unit_average") + + r += 1 + # Normalisation label + normalisation_label = ctk.CTkLabel( + self.head, text="Normalisation:", + font=("Segoe UI", 18, "bold"), + ) + normalisation_label.grid(column=0, row=r) + # Combobox for normalisation_label + normalisation_combobox = ctk.CTkComboBox( + self.head, + values=("False", "ctrl_max_desc"), + variable=self.normalisation_adv, + state="readonly", + width=260, + ) + normalisation_combobox.grid(column=1, row=r) + self.normalisation_adv.set("False") + + r += 1 + # clean label # TODO this may become a bool + clean_label = ctk.CTkLabel( + self.head, text="Clean:", + font=("Segoe UI", 18, "bold"), + ) + clean_label.grid(column=0, row=r) + # Combobox for normalisation_label + clean_combobox = ctk.CTkComboBox( + self.head, + values=("True", "False"), + variable=self.clean_adv, + state="readonly", + width=260, + ) + clean_combobox.grid(column=1, row=r) + self.clean_adv.set("True") + + r += 1 + text = ( + "Sort MUs based on recruitment order before PICs estimation." + + "\nThis can be done in the main window." + ) + description_label = ctk.CTkLabel( + self.head, + text=text, + font=("Segoe UI", 10.5), + ) + description_label.grid(column=1, row=r, pady=(5, 0)) + + r += 1 + # Add button to execute MU tracking + compute_pic_button = ctk.CTkButton( + self.head, + text="Compute PIC", + command=self.compute_pic, + ) + compute_pic_button.grid( + column=0, row=14, columnspan=2, sticky=(W, E), + ) + + # Add padding + for child in self.head.winfo_children(): + child.grid_configure(padx=5, pady=5) + + # Configure weights + for row in range(r): + self.head.rowconfigure(row, weight=1) + for column in range(4): + self.head.columnconfigure(column, weight=1) + # Destroy first window to avoid too many pop-ups - self.a_window.withdraw() # TODO check for reference as destroy will evoke tcl error upon continuing, whereas withdraw creates issue when closing. + self.a_window.withdraw() + # TODO check for reference as destroy will evoke tcl error upon + # continuing, whereas withdraw creates issue when closing. ### Define function for advanced analysis tools def open_emgfile1(self): @@ -768,7 +910,7 @@ def track_mus(self): openhdemg.tracking() """ - # Reload settings for tracking + # Reload settings for tracking and duplicates removal self.parent.load_settings() try: @@ -804,26 +946,31 @@ def track_mus(self): exclude_belowthreshold=self.exclude_thres.get(), filter=self.filter_adv.get(), show=self.show_adv.get(), + gui=self.parent.settings.tracking__gui, + gui_addrefsig=self.parent.settings.tracking__gui_addrefsig, + gui_csv_separator=self.parent.settings.tracking__gui_csv_separator, ) - # Add result terminal - track_terminal = ttk.LabelFrame( - self.head, text="MUs Tracking Result", height=100, - relief="ridge", - ) - track_terminal.grid( - column=2, - row=0, - columnspan=2, - rowspan=14, - pady=8, - padx=10, - sticky=(N, S, W, E), - ) + # Add result terminal, only if tracking__gui=False to avoid + # uninterrupted processes. + if not self.parent.settings.tracking__gui: + track_terminal = ttk.LabelFrame( + self.head, text="MUs Tracking Results", height=100, + relief="ridge", + ) + track_terminal.grid( + column=2, + row=0, + columnspan=2, + rowspan=14, + pady=8, + padx=10, + sticky=(N, S, W, E), + ) - # Add table containing results to the label frame - track_table = Table(track_terminal, dataframe=tracking_res) - track_table.show() + # Add table containing results to the label frame + track_table = Table(track_terminal, dataframe=tracking_res) + track_table.show() except AttributeError as e: show_error_dialog( @@ -905,6 +1052,7 @@ def remove_duplicates_between(self): custom_sorting_order=self.parent.settings.custom_sorting_order, filter=self.filter_adv.get(), show=self.show_adv.get(), + gui=False, which=self.which_adv.get(), ) @@ -943,3 +1091,81 @@ def remove_duplicates_between(self): + "\n - custom_sorting_order in settings" ), ) + + def compute_pic(self): + try: + # Reload settings + self.parent.load_settings() + + # Check that at least 1-2 MU is present, compute PICs and show results in the terminal + if self.parent.resdict["NUMBER_OF_MUS"] < 2: + show_error_dialog( + parent=self, + error=None, + solution=str( + "Not enough MUs for PICs estimation." + ), + ) + return + + # Get smoothed DR + print("\n-----------------------------\nDR smoothing in progress") + if self.smoothing_method_adv.get() == "Support Vector Regression": + svrfits = openhdemg.compute_svr( + emgfile=self.parent.resdict, + gammain=self.parent.settings.compute_svr__gammain, + regparam=self.parent.settings.compute_svr__regparam, + endpointweights_numpulses=self.parent.settings.compute_svr__endpointweights_numpulses, + endpointweights_magnitude=self.parent.settings.compute_svr__endpointweights_magnitude, + discontfiring_dur=self.parent.settings.compute_svr__discontfiring_dur, + ) + else: + pass # TODO to implement + + print( + "-----------------------------\nDeltaF estimation in progress" + ) + delta_f = openhdemg.compute_deltaf( + emgfile=self.parent.resdict, + smoothfits=svrfits["gensvr"], + average_method=self.average_method_adv.get(), + normalisation=self.normalisation_adv.get(), + recruitment_difference_cutoff=self.parent.settings.compute_deltaf__recruitment_difference_cutoff, + corr_cutoff=self.parent.settings.compute_deltaf__corr_cutoff, + controlunitmodulation_cutoff=self.parent.settings.compute_deltaf__controlunitmodulation_cutoff, + clean=self.clean_adv.get(), + ) + print( + "-----------------------------\nDeltaF estimation finished" + + "\n-----------------------------\n" + ) + + # Add results terminal + track_terminal = ttk.LabelFrame( + self.head, text="MUs PICs Results", height=100, width=100, + relief="ridge", + ) + track_terminal.grid( + column=2, + row=0, + columnspan=2, + rowspan=15, + pady=8, + padx=10, + sticky=(N, S, W, E), + ) + + # Add table containing results to the label frame + track_table = Table(track_terminal, dataframe=delta_f) + track_table.show() + + except AttributeError as e: + show_error_dialog( + parent=self, + error=e, + solution=str( + "Please make sure to load a file in the main window " + + "for Persistent Inward Currents estimation." + ), + ) + self.head.destroy() diff --git a/openhdemg/gui/gui_modules/mu_properties.py b/openhdemg/gui/gui_modules/mu_properties.py index ea75c24..7022c63 100644 --- a/openhdemg/gui/gui_modules/mu_properties.py +++ b/openhdemg/gui/gui_modules/mu_properties.py @@ -318,6 +318,9 @@ def compute_mu_threshold(self): compute_thresholds in library. """ + # Update settings + self.parent.load_settings() + try: # Compute thresholds self.parent.mu_thresholds = openhdemg.compute_thresholds( @@ -374,6 +377,9 @@ def compute_mu_dr(self): compute_dr in library. """ + # Update settings + self.parent.load_settings() + try: # Compute discharge rates self.parent.mus_dr = openhdemg.compute_dr( @@ -381,6 +387,7 @@ def compute_mu_dr(self): n_firings_RecDerec=int(self.firings_rec.get()), n_firings_steady=int(self.firings_ste.get()), event_=self.dr_event.get(), + idr_range=self.parent.settings.compute_dr__idr_range, ) # Display results self.parent.display_results(self.parent.mus_dr) @@ -433,6 +440,9 @@ def basic_mus_properties(self): basic_mus_properties in library. """ + # Update settings + self.parent.load_settings() + try: # Calculate properties self.parent.mu_prop_df = openhdemg.basic_mus_properties( @@ -440,6 +450,7 @@ def basic_mus_properties(self): n_firings_rt_dert=self.parent.settings.basic_mus_properties__n_firings_rt_dert, n_firings_RecDerec=int(self.b_firings_rec.get()), n_firings_steady=int(self.b_firings_ste.get()), + idr_range=self.parent.settings.basic_mus_properties__idr_range, accuracy=self.parent.settings.basic_mus_properties__accuracy, ignore_negative_ipts=self.parent.settings.basic_mus_properties__ignore_negative_ipts, constrain_pulses=self.parent.settings.basic_mus_properties__constrain_pulses, diff --git a/openhdemg/gui/settings.py b/openhdemg/gui/settings.py index fb335ea..b477037 100644 --- a/openhdemg/gui/settings.py +++ b/openhdemg/gui/settings.py @@ -19,30 +19,31 @@ import numpy as np + # --------------------------------- openfiles --------------------------------- -# in emg_from_demuse() +# In emg_from_demuse() emg_from_demuse__ignore_negative_ipts = False -# in emg_from_otb() +# In emg_from_otb() emg_from_otb__ext_factor = 8 emg_from_otb__refsig = [True, "fullsampled"] emg_from_otb__extras = None emg_from_otb__ignore_negative_ipts = False -# in refsig_from_otb() +# In refsig_from_otb() refsig_from_otb__refsig = "fullsampled" refsig_from_otb__extras = None -# in emg_from_delsys() +# In emg_from_delsys() emg_from_delsys__emg_sensor_name = "Galileo sensor" emg_from_delsys__refsig_sensor_name = "Trigno Load Cell" emg_from_delsys__filename_from = "mus_directory" -# in refsig_from_delsys() +# In refsig_from_delsys() refsig_from_delsys__refsig_sensor_name = "Trigno Load Cell" -# in emg_from_customcsv() +# In emg_from_customcsv() emg_from_customcsv__ref_signal = "REF_SIGNAL" emg_from_customcsv__raw_signal = "RAW_SIGNAL" emg_from_customcsv__ipts = "IPTS" @@ -53,48 +54,71 @@ emg_from_customcsv__fsamp = 2048 emg_from_customcsv__ied = 8 -# in refsig_from_customcsv() +# In refsig_from_customcsv() refsig_from_customcsv__ref_signal = "REF_SIGNAL" refsig_from_customcsv__extras = "EXTRAS" refsig_from_customcsv__fsamp = 2048 -# in save_json_emgfile() +# In save_json_emgfile() save_json_emgfile__compresslevel = 4 # ---------------------------------- analysis --------------------------------- -# in compute_thresholds() +# In compute_thresholds() compute_thresholds__n_firings = 1 -# in basic_mus_properties() +# In compute_dr() +compute_dr__idr_range = None + +# In basic_mus_properties() basic_mus_properties__n_firings_rt_dert = 1 basic_mus_properties__accuracy = "default" basic_mus_properties__ignore_negative_ipts = False basic_mus_properties__constrain_pulses = [True, 3] +basic_mus_properties__idr_range = None # ----------------------------------- tools ----------------------------------- -# in resize_emgfile() +# In resize_emgfile() resize_emgfile__how = "ref_signal" resize_emgfile__accuracy = "recalculate" resize_emgfile__ignore_negative_ipts = False +# In compute_svr() +compute_svr__gammain = 1/1.6 +compute_svr__regparam = 1/0.370 +compute_svr__endpointweights_numpulses = 5 +compute_svr__endpointweights_magnitude = 5 +compute_svr__discontfiring_dur = 1.0 + + +# ------------------------------------ pic ------------------------------------ +# In compute_deltaf() +compute_deltaf__recruitment_difference_cutoff = 1.0 +compute_deltaf__corr_cutoff = 0.7 +compute_deltaf__controlunitmodulation_cutoff = 0.5 + # ------------------------------------ muap ----------------------------------- -# in tracking() +# In tracking() tracking__firings = "all" tracking__derivation = "sd" +tracking__gui = True +tracking__gui_addrefsig = True +tracking__gui_csv_separator = "\t" -# in remove_duplicates_between() +# In remove_duplicates_between() remove_duplicates_between__firings = "all" remove_duplicates_between__derivation = "sd" -# in MUcv_gui() +# In MUcv_gui() MUcv_gui__n_firings = [0, 50] MUcv_gui__muaps_timewindow = 50 MUcv_gui__figsize = [25, 20] +MUcv_gui__csv_separator = "\t" + # --------------------------------- electrodes -------------------------------- # This custom sorting order is valid for all the GUI windows, although the diff --git a/openhdemg/library/__init__.py b/openhdemg/library/__init__.py index 1df624e..58a4fc1 100644 --- a/openhdemg/library/__init__.py +++ b/openhdemg/library/__init__.py @@ -7,6 +7,7 @@ "otbelectrodes", "muap", "info", + "pic" ] from openhdemg.library.openfiles import ( @@ -30,3 +31,4 @@ from openhdemg.library.electrodes import * from openhdemg.library.muap import * from openhdemg.library.info import * +from openhdemg.library.pic import * diff --git a/openhdemg/library/analysis.py b/openhdemg/library/analysis.py index d8f297f..25c0ae4 100644 --- a/openhdemg/library/analysis.py +++ b/openhdemg/library/analysis.py @@ -196,6 +196,7 @@ def compute_dr( start_steady=-1, end_steady=-1, event_="rec_derec_steady", + idr_range=None, ): """ Calculate the discharge rate (DR). @@ -233,6 +234,12 @@ def compute_dr( ``steady`` DR is calculated during the steady-state phase. + idr_range : None or list, default None + If idr_range is a list [lower_limit, upper_limit], only firings with an + instantaneous discharge rate (IDR) within the limits are used for DR + calculation. lower_limit and upper_limit should be in pulses per + second. See examples section. + If idr_range is None, all the firings are used for DR calculation. Returns ------- @@ -300,6 +307,21 @@ def compute_dr( 1 14.440561 10.019572 11.822081 11.683134 2 7.293547 5.846093 7.589531 8.055731 3 13.289651 9.694317 11.613640 11.109796 + + The firings used for DR calculation can be filtered to avoid too closed + firings (e.g., doublets) or intervals of inactivity. + + >>> emgfile = emg.emg_from_samplefile() + >>> idr = emg.compute_dr( + ... emgfile, start_steady=15000, end_steady=51000, idr_range=[1, 50], + ... ) + >>> idr + DR_rec DR_derec ... DR_end_steady DR_all_steady DR_all + 0 3.341579 4.606835 ... 8.506270 7.895837 7.657269 + 1 5.701081 4.662196 ... 6.271989 6.916566 6.814687 + 2 5.699017 3.691367 ... 7.221309 8.183408 7.949294 + 3 7.548770 5.449581 ... 10.399834 11.164994 10.693076 + 4 8.344515 5.333535 ... 9.694317 10.750855 10.543011 """ # Check that all the inputs are correct @@ -324,6 +346,23 @@ def compute_dr( idr = compute_idr(emgfile=emgfile) + # Filter firings outside the idr_range, if required + if idr_range is not None: + if not isinstance(idr_range, list): + raise ValueError( + "idr_range can be None or a list of 2 numbers. " + + f"A{type(idr_range)} was passed instead." + ) + else: + if len(idr_range) != 2: + raise ValueError( + "idr_range can be None or a list of 2 numbers. " + + f"The list contains {len(idr_range)} numbers instead." + ) + for mu in idr.keys(): + idr[mu]["idr"] = idr[mu]["idr"][idr[mu]["idr"] > idr_range[0]] + idr[mu]["idr"] = idr[mu]["idr"][idr[mu]["idr"] < idr_range[1]] + # Check if we need to manually select the area for the steady-state phase title = ( "Select the start/end area of the steady-state by hovering the mouse" + @@ -457,6 +496,7 @@ def basic_mus_properties( n_firings_steady=10, start_steady=-1, end_steady=-1, + idr_range=None, accuracy="default", ignore_negative_ipts=False, constrain_pulses=[True, 3], @@ -496,6 +536,13 @@ def basic_mus_properties( The start and end point (in samples) of the steady-state phase. If < 0 (default), the user will need to manually select the start and end of the steady-state phase. + idr_range : None or list, default None + If idr_range is a list [lower_limit, upper_limit], only firings with an + instantaneous discharge rate (IDR) within the limits are used for DR + and coefficient of variation of interspike interval (COVisi) + calculation. lower_limit and upper_limit should be in pulses per + second. See compute_dr() examples section. + If idr_range is None, all the firings are used for DR calculation. accuracy : str {"default", "SIL", "PNR", "SIL_PNR"}, default "default" The accuracy measure to return. @@ -732,6 +779,7 @@ def basic_mus_properties( n_firings_steady=n_firings_steady, start_steady=start_steady, end_steady=end_steady, + idr_range=idr_range, ) exportable_df = pd.concat([exportable_df, mus_dr], axis=1) @@ -742,6 +790,7 @@ def basic_mus_properties( start_steady=start_steady, end_steady=end_steady, event_="steady", + idr_range=idr_range, ) exportable_df = pd.concat([exportable_df, covisi], axis=1) @@ -763,6 +812,7 @@ def compute_covisi( start_steady=-1, end_steady=-1, event_="rec_derec_steady", + idr_range=None, single_mu_number=-1, ): """ @@ -800,6 +850,12 @@ def compute_covisi( ``steady`` covisi is calculated during the steady-state phase. + idr_range : None or list, default None + If idr_range is a list [lower_limit, upper_limit], only firings with an + instantaneous discharge rate (IDR) within the limits are used for + COVisi calculation. lower_limit and upper_limit should be in pulses per + second. See compute_dr() examples section. + If idr_range is None, all the firings are used for DR calculation. single_mu_number : int, default -1 Number of the specific MU to compute the COVisi. If single_mu_number >= 0, only the COVisi of the entire contraction @@ -884,6 +940,29 @@ def compute_covisi( # We use the idr pd.DataFrame to calculate the COVisi idr = compute_idr(emgfile=emgfile) + # Filter firings outside the idr_range, if required + if idr_range is not None: + if not isinstance(idr_range, list): + raise ValueError( + "idr_range can be None or a list of 2 numbers. " + + f"A{type(idr_range)} was passed instead." + ) + else: + if len(idr_range) != 2: + raise ValueError( + "idr_range can be None or a list of 2 numbers. " + + f"The list contains {len(idr_range)} numbers instead." + ) + idr_range[0] = emgfile["FSAMP"] / idr_range[0] + idr_range[1] = emgfile["FSAMP"] / idr_range[1] + for mu in idr.keys(): + idr[mu]["diff_mupulses"] = idr[mu]["diff_mupulses"][ + idr[mu]["diff_mupulses"] < idr_range[0] + ] + idr[mu]["diff_mupulses"] = idr[mu]["diff_mupulses"][ + idr[mu]["diff_mupulses"] > idr_range[1] + ] + # Check if we need to analyse all the MUs or a single MU if single_mu_number < 0: # Check if we need to manually select the area for the steady-state @@ -981,6 +1060,7 @@ def compute_drvariability( start_steady=-1, end_steady=-1, event_="rec_derec_steady", + idr_range=None, ): """ Calculate the DR variability. @@ -1018,6 +1098,12 @@ def compute_drvariability( ``steady`` DR variability is calculated during the steady-state phase. + idr_range : None or list, default None + If idr_range is a list [lower_limit, upper_limit], only firings with an + instantaneous discharge rate (IDR) within the limits are used for DR + variability calculation. lower_limit and upper_limit should be in + pulses per second. See compute_dr() examples section. + If idr_range is None, all the firings are used for DR calculation. Returns ------- @@ -1090,6 +1176,23 @@ def compute_drvariability( # We use the idr pd.DataFrame to calculate the COVisi idr = compute_idr(emgfile=emgfile) + # Filter firings outside the idr_range, if required + if idr_range is not None: + if not isinstance(idr_range, list): + raise ValueError( + "idr_range can be None or a list of 2 numbers. " + + f"A{type(idr_range)} was passed instead." + ) + else: + if len(idr_range) != 2: + raise ValueError( + "idr_range can be None or a list of 2 numbers. " + + f"The list contains {len(idr_range)} numbers instead." + ) + for mu in idr.keys(): + idr[mu]["idr"] = idr[mu]["idr"][idr[mu]["idr"] > idr_range[0]] + idr[mu]["idr"] = idr[mu]["idr"][idr[mu]["idr"] < idr_range[1]] + # Check if we need to manually select the area for the steady-state phase if event_ == "rec_derec_steady" or event_ == "steady": title = ( diff --git a/openhdemg/library/mathtools.py b/openhdemg/library/mathtools.py index f08b7aa..dac3022 100644 --- a/openhdemg/library/mathtools.py +++ b/openhdemg/library/mathtools.py @@ -18,7 +18,7 @@ def min_max_scaling(series_or_df): """ Min-max scaling of pd.series or pd.dataframes. - Min-max feature scaling is often simply referred to as normalization, + Min-max feature scaling is often simply referred to as normalisation, which rescales the dataset feature to a range of 0 - 1. It's calculated by subtracting the feature's minimum value from the value and then dividing it by the difference between the maximum and minimum @@ -119,19 +119,19 @@ def norm_xcorr(sig1, sig2, out="both"): def norm_twod_xcorr(df1, df2, mode="full"): """ - Normalised 2-dimensional cross-correlation of STAs of two MUS. + Normalised 2-dimensional cross-correlation of 2. - Any pre-processing of the RAW_SIGNAL (i.e., normal, differential or double - differential) can be passed as long as the two inputs have same shape. + The two inputs must have same shape. + When this function is used to cross-correlate MUAPs obtained via STA, + df1 and df2 should contain the unpacked STA of the first and second MU, + respectively, without np.nan columns. Parameters ---------- df1 : pd.DataFrame - A pd.DataFrame containing the STA of the first MU without np.nan - column. + A pd.DataFrame containing the first 2-dimensional signal. df2 : pd.DataFrame - A pd.DataFrame containing the STA of the second MU without np.nan - column. + A pd.DataFrame containing the second 2-dimensional signal. mode : str {"full", "valid", "same"}, default "full" A string indicating the size of the output: @@ -692,7 +692,7 @@ def mle_cv_est(sig, initial_teta, ied, fsamp): Examples -------- - Refer to the examples of find_teta to obtain sig and initial_teta. + Refer to the examples of find_mle_teta to obtain sig and initial_teta. """ # Calculate ied in meters @@ -737,7 +737,7 @@ def mle_cv_est(sig, initial_teta, ied, fsamp): return cv, teta -def find_teta(sig1, sig2, ied, fsamp): +def find_mle_teta(sig1, sig2, ied, fsamp): """ Find the starting value for teta. @@ -811,7 +811,7 @@ def find_teta(sig1, sig2, ied, fsamp): Third, estimate teta. - >>> initial_teta = emg.find_teta( + >>> initial_teta = emg.find_mle_teta( ... sig1=sig1, ... sig2=sig2, ... ied=emgfile["IED"], diff --git a/openhdemg/library/muap.py b/openhdemg/library/muap.py index b0a2067..d13febb 100644 --- a/openhdemg/library/muap.py +++ b/openhdemg/library/muap.py @@ -8,11 +8,11 @@ from openhdemg.library.mathtools import ( norm_twod_xcorr, norm_xcorr, - find_teta, + find_mle_teta, mle_cv_est, ) from openhdemg.library.electrodes import sort_rawemg -from openhdemg.library.plotemg import plot_muaps, plot_muaps_for_cv +from openhdemg.library.plotemg import plot_idr, plot_muaps, plot_muaps_for_cv from scipy import signal import matplotlib.pyplot as plt from functools import reduce @@ -21,11 +21,9 @@ from joblib import Parallel, delayed import copy import os -import warnings import tkinter as tk from tkinter import ttk from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import pyperclip def diff(sorted_rawemg): @@ -688,7 +686,7 @@ def align_by_xcorr(sta_mu1, sta_mu2, finalduration=0.5): no_nan_sta2 = df2.dropna(axis=1, inplace=False) # Compute 2dxcorr to identify a common lag/delay - normxcorr_df, normxcorr_max = norm_twod_xcorr( + normxcorr_df, _ = norm_twod_xcorr( no_nan_sta1, no_nan_sta2, mode="same" ) @@ -737,10 +735,7 @@ def align_by_xcorr(sta_mu1, sta_mu2, finalduration=0.5): # TODO update examples for code="None" - -# This function exploits parallel processing: -# - align and xcorr are processed in parallel -# - plotting is processed in parallel +# This function exploits parallel processing for MUAPs alignment and xcorr def tracking( emgfile1, emgfile2, @@ -756,11 +751,18 @@ def tracking( custom_muaps=None, exclude_belowthreshold=True, filter=True, + multiprocessing=True, show=False, + gui=True, + gui_addrefsig=True, + gui_csv_separator="\t", ): """ Track MUs across two files comparing the MUAPs' shape and distribution. + It is also possible to use a convenient GUI for the inspection of the + obtained MU pairs. + Parameters ---------- emgfile1 : dict @@ -841,14 +843,31 @@ def tracking( filter : bool, default True If true, when the same MU has a match of XCC > threshold with multiple MUs, only the match with the highest XCC is returned. + multiprocessing : bool, default True + If True (default) parallel processing will be used to reduce execution + time. show : bool, default False - Whether to plot the STA of pairs of MUs with XCC above threshold. + Whether to plot the STA of pairs of MUs with XCC above threshold. Set + to False (default) when gui=True to avoid postponing the GUI execution. + If show=True and gui=True, the GUI will be executed after closing all + the figures. + gui : bool, default True + If True (default) a GUI for the visual inspection and manual selection + of the tracking results will be called. + gui_addrefsig : bool, default True + If True, the REF_SIGNAL is plotted in front of the IDR with a + separated y-axes. This is used only when gui=True. + gui_csv_separator : str, default "\t" + The field delimiter used by the GUI to create the .csv copied to the + clipboard. This is used only when gui=True. Returns ------- tracking_res : pd.DataFrame The results of the tracking including the MU from file 1, MU from file 2 and the normalised cross-correlation value (XCC). + If gui=True, an additional column indicating the inclusion/exclusion + of the MUs pairs will also be present in tracking_res. Warns ----- @@ -860,18 +879,39 @@ def tracking( - sta : computes the STA of every MUs. - norm_twod_xcorr : normalised 2-dimensional cross-correlation of STAs of two MUs. + - Tracking_gui : GUI for the visual inspection and manual selection of the + tracking results (directly callable from the tracking function). - remove_duplicates_between : remove duplicated MUs across two different files based on STA. Notes ----- - Parallel processing can improve performances by 5-10 times compared to + Parallel processing can significantly improve performances compared to serial processing. In this function, parallel processing has been - implemented for the tasks involving 2-dimensional cross-correlation, and - plotting. This might change in future releases. + implemented for the tasks involving 2-dimensional cross-correlation. Examples -------- + Track MUs between two OPENHDEMG (.json) files and inspect the results with + a convenient GUI. + + >>> import openhdemg.library as emg + >>> emgfile1 = emg.askopenfile(filesource="OPENHDEMG") + >>> emgfile2 = emg.askopenfile(filesource="OPENHDEMG") + >>> tracking_res = emg.tracking( + ... emgfile1=emgfile1, + ... emgfile2=emgfile2, + ... firings="all", + ... derivation="sd", + ... timewindow=50, + ... threshold=0.8, + ... matrixcode="GR08MM1305", + ... orientation=180, + ... filter=True, + ... show=False, + ... gui=True, + >>> ) + Track MUs between two OTB files and show the filtered results. >>> import openhdemg.library as emg @@ -891,6 +931,7 @@ def tracking( ... exclude_belowthreshold=True, ... filter=True, ... show=False, + ... gui=False, ... ) MU_file1 MU_file2 XCC 0 0 3 0.820068 @@ -932,6 +973,7 @@ def tracking( ... exclude_belowthreshold=True, ... filter=True, ... show=False, + ... gui=False, ... ) """ @@ -995,7 +1037,7 @@ def tracking( else: raise ValueError("custom_muaps is not a list of two dictionaries") - print("\nTracking started") + print("\nTracking started:") # Tracking function to run in parallel def parallel(mu_file1): # Loop all the MUs of file 1 @@ -1020,7 +1062,7 @@ def parallel(mu_file1): # Loop all the MUs of file 1 df1.dropna(axis=1, inplace=True) df2, _ = unpack_sta(aligned_sta2) df2.dropna(axis=1, inplace=True) - normxcorr_df, normxcorr_max = norm_twod_xcorr( + _, normxcorr_max = norm_twod_xcorr( df1, df2, mode="full" ) @@ -1037,19 +1079,29 @@ def parallel(mu_file1): # Loop all the MUs of file 1 return res - # Start parallel execution - # Measure running time - t0 = time.time() - - res = Parallel(n_jobs=-1)( - delayed(parallel)(mu_file1) for mu_file1 in range(emgfile1["NUMBER_OF_MUS"]) - ) + if multiprocessing: + # Start parallel execution + res = Parallel(n_jobs=-1, verbose=1)( + delayed(parallel)(mu_file1) for mu_file1 in range(emgfile1["NUMBER_OF_MUS"]) + ) + print("\n") - t1 = time.time() - print(f"\nTime of tracking parallel processing: {round(t1-t0, 2)} Sec\n") + else: + # Start serial execution + t0 = time.time() + + res = [] + for pos, mu_file1 in enumerate(range(emgfile1["NUMBER_OF_MUS"])): + res.append(parallel(mu_file1)) + # Show progress + t1 = time.time() + print( + f"Done {pos+1} out of {emgfile1['NUMBER_OF_MUS']} | " + + f"Elapsed time: {round(t1-t0, 2)} Sec", + ) + print("\n") # Convert res to pd.DataFrame - tracking_res = [] for pos, i in enumerate(res): if pos == 0: tracking_res = pd.DataFrame(i) @@ -1060,32 +1112,39 @@ def parallel(mu_file1): # Loop all the MUs of file 1 # Filter the results if filter: # Sort file by MUs in file 1 and XCC to have first the highest XCC - sorted_res = tracking_res.sort_values( - by=["MU_file1", "XCC"], ascending=False + tracking_res = tracking_res.sort_values( + by=["MU_file1", "XCC"], ascending=False, ) + # Get unique MUs from file 1 - unique = sorted_res["MU_file1"].unique() + unique = tracking_res["MU_file1"].unique() - res_unique = pd.DataFrame(columns=sorted_res.columns) + res_unique = pd.DataFrame(columns=tracking_res.columns) # Get the combo uf unique MUs from file 1 with MUs from file 2 for pos, mu1 in enumerate(unique): - this_res = sorted_res[sorted_res["MU_file1"] == mu1] + this_res = tracking_res[tracking_res["MU_file1"] == mu1] # Fill the result df with the first row (highest XCC) res_unique.loc[pos, :] = this_res.iloc[0, :] # Now repeat the task with MUs from file 2 - sorted_res = res_unique.sort_values( + tracking_res = res_unique.sort_values( by=["MU_file2", "XCC"], ascending=False ) - unique = sorted_res["MU_file2"].unique() - res_unique = pd.DataFrame(columns=sorted_res.columns) + unique = tracking_res["MU_file2"].unique() + res_unique = pd.DataFrame(columns=tracking_res.columns) for pos, mu2 in enumerate(unique): - this_res = sorted_res[sorted_res["MU_file2"] == mu2] + this_res = tracking_res[tracking_res["MU_file2"] == mu2] res_unique.loc[pos, :] = this_res.iloc[0, :] tracking_res = res_unique.sort_values(by=["MU_file1"]) + else: + # Sort file by MUs in file 1 and XCC to have first the highest XCC + tracking_res = tracking_res.sort_values( + by=["MU_file1", "XCC"], ascending=[True, False], + ) + # Print the full results pd.set_option("display.max_rows", None) convert_dict = {"MU_file1": int, "MU_file2": int, "XCC": float} @@ -1097,7 +1156,7 @@ def parallel(mu_file1): # Loop all the MUs of file 1 # Plot the MUs pairs if show: - def parallel(ind): # Function for the parallel execution of plotting + for ind in tracking_res.index: if tracking_res["XCC"].loc[ind] >= threshold: # Align STA if not isinstance(custom_muaps, list): @@ -1121,18 +1180,422 @@ def parallel(ind): # Function for the parallel execution of plotting showimmediately=False ) - plt.show() + if ind == tracking_res.index[-1]: + plt.show(block=True) + else: + plt.show(block=False) + + # Call the GUI and return the tracking_res + if gui: + # Check if no pairs have been detected + if tracking_res.empty: + return tracking_res + + tracking_gui = Tracking_gui( + emgfile1=emgfile1, + emgfile2=emgfile2, + tracking_res=tracking_res, + sta_emgfile1=sta_emgfile1, + sta_emgfile2=sta_emgfile2, + align_muaps=False if isinstance(custom_muaps, list) else True, + addrefsig=gui_addrefsig, + csv_separator=gui_csv_separator, + ) + + # Get and return updated tracking_res + return tracking_gui.get_results() + + else: + return tracking_res + + +class Tracking_gui(): # TODO add delete excluded pairs button + """ + GUI for the visual inspection and manual selection of the tracking results. + + Parameters + ---------- + emgfile1 : dict + The dictionary containing the first emgfile. + emgfile2 : dict + The dictionary containing the second emgfile. + tracking_res : pd.DataFrame + The results of the tracking including the MU from file 1, + MU from file 2 and the normalised cross-correlation value (XCC). + This is obtained with the function `tracking()`. + sta_emgfile1 : dict + dict containing a dict of STA (pd.DataFrame) for every MUs from + emgfile1. This is obtained with the function `sta()`. + sta_emgfile2 : dict + dict containing a dict of STA (pd.DataFrame) for every MUs from + emgfile2. This is obtained with the function `sta()`. + align_muaps : bool, default True + Whether to align the MUAPs before plotting. If true, the visualised + MUAPs time window will be 1/2 of the original (because the maximum + allowed shift during MUAPs alignment is 50%). + addrefsig : bool, default True + If True, the REF_SIGNAL is plotted in front of the IDR with a + separated y-axes. + csv_separator : str, default "\t" + The field delimiter used to create the .csv copied to the clipboard. + + Methods + ------- + get_results() + Returns the results of the tracking including the MU from file 1, + MU from file 2, the normalised cross-correlation value (XCC) and the + inclusion or exclusion of the MUs pair. after the GUI is closed. + + See also + -------- + - tracking : Track MUs across two files comparing the MUAPs' shape and + distribution. + + Examples + -------- + Track MUs between two files. + + >>> import openhdemg.library as emg + >>> emgfile_1 = emg.askopenfile(filesource="OPENHDEMG") + >>> emgfile_2 = emg.askopenfile(filesource="OPENHDEMG") + >>> tracking_res = emg.tracking(emgfile_1, emgfile_2, timewindow=50) + + Obtained required variables for Tracking_gui(). Pay attention to use the + same derivation specified during tracking and to use an appropriate MUAPs + timewindow, according to the align_muaps option in Tracking_gui(). + + >>> sorted_rawemg_1 = emg.sort_rawemg(emgfile_1) + >>> sorted_rawemg_1 = emg.diff(sorted_rawemg_1) + >>> sta_dict_1 = emg.sta(emgfile_1, sorted_rawemg_1, timewindow=100) + >>> sorted_rawemg_2 = emg.sort_rawemg(emgfile_2) + >>> sorted_rawemg_2 = emg.diff(sorted_rawemg_2) + >>> sta_dict_2 = emg.sta(emgfile_2, sorted_rawemg_2, timewindow=100) + + Inspect the tracking results with a convenient GUI. + + >>> tracking = emg.Tracking_gui( + ... emgfile1=emgfile_1, + ... emgfile2=emgfile_2, + ... tracking_res=tracking_res, + ... sta_emgfile1=sta_dict_1, + ... sta_emgfile2=sta_dict_2, + ... align_muaps=True, + ... addrefsig=True, + ... ) + + Return the updated results for further use in the code. + + >>> updated_results = tracking.get_results() + MU_file1 MU_file2 XCC Inclusion + 0 0 2 0.897364 Excluded + 1 1 0 0.947486 Included + 2 2 1 0.923901 Included + 3 4 8 0.893922 Included + """ + def __init__( + self, + emgfile1, + emgfile2, + tracking_res, + sta_emgfile1, + sta_emgfile2, + align_muaps=True, + addrefsig=True, + csv_separator="\t", + ): + # Define needed variables + self.emgfile1 = emgfile1 + self.emgfile2 = emgfile2 + self.tracking_res = tracking_res + self.sta_emgfile1 = sta_emgfile1 + self.sta_emgfile2 = sta_emgfile2 + self.align_muaps = align_muaps + self.addrefsig = addrefsig + self.csv_separator = csv_separator + + # Add included/excluded label to self.tracking_res + self.tracking_res = self.tracking_res.assign(Inclusion="Included") + + # Define GUI structure + # After that, set up the GUI + self.root = tk.Tk() + self.root.title('MUAPs tracking') + root_path = os.path.dirname(os.path.abspath(__file__)) + iconpath = os.path.join( + root_path, + "..", + "gui", + "gui_files", + "Icon_transp.ico" + ) + self.root.iconbitmap(iconpath) + + # Create outer frames, assign structure and minimum spacing + # Top + top_frm = tk.Frame(self.root, padx=10) + top_frm.grid( + row=0, column=0, columnspan=3, sticky=tk.NSEW, pady=(10, 8), + ) + # Central - Left + self.central_left_frm = tk.Frame( + self.root, padx=1, pady=1, background="gray", + ) + self.central_left_frm.grid( + row=1, rowspan=2, column=0, columnspan=2, sticky=tk.NSEW, + ) + # Central - Right Top + self.central_right_top_frm = tk.Frame( + self.root, padx=0, pady=1, background="gray", + ) + self.central_right_top_frm.grid( + row=1, column=2, columnspan=1, sticky=tk.NSEW, + ) + # Central - Right Bottom + self.central_right_bottom_frm = tk.Frame( + self.root, padx=0, pady=1, background="gray", + ) + self.central_right_bottom_frm.grid( + row=2, column=2, columnspan=1, sticky=tk.NSEW, + ) + # Bottom + bottom_frm = tk.Frame(self.root, padx=1, pady=0, background="gray") + bottom_frm.grid(row=3, column=0, columnspan=3, sticky=tk.NSEW) + + # Assign rows and columns weight to avoid empty spaces + self.root.rowconfigure(1, weight=1) + self.root.rowconfigure(2, weight=1) + + self.root.columnconfigure(0, weight=1) + self.root.columnconfigure(1, weight=1) + self.root.columnconfigure(2, weight=1) + + # Label MU pair selection and create combobox to change MUs pair + # File 1 + mu_pair_label = ttk.Label(top_frm, text="Pair of MUs to visualise:") + mu_pair_label.pack(side=tk.LEFT) + + mu_pairs = list(self.tracking_res.index) + self.select_mu_pair_cb = ttk.Combobox( + top_frm, + values=mu_pairs, + state='readonly', + width=5, + ) + self.select_mu_pair_cb.pack(side=tk.LEFT, padx=(2, 15)) + self.select_mu_pair_cb.current(0) + # gui_plot() takes one positional argument (self), but the bind() + # method is passing two arguments: the event object and the function + # itself. Use lambda to avoid the error. + self.select_mu_pair_cb.bind( + '<>', + lambda event: self.gui_plot(), + ) + + # Button to copy the dataframe to clipboard + copy_btn = ttk.Button( + top_frm, + text="Copy results", + command=self.copy_to_clipboard, + ) + copy_btn.pack(side=tk.RIGHT, padx=(20, 0)) + + # Button to include/exclude MUAPs + btn_include_muaps = ttk.Button( + top_frm, + text="Include/Exclude", + command=self.include_exclude, + ) + btn_include_muaps.pack(side=tk.RIGHT, padx=(0, 20)) + + # Inclusion label + self.inclusion_label = ttk.Label( + top_frm, text="INCLUDED", foreground="green", + ) + self.inclusion_label.pack(side=tk.RIGHT, padx=(20, 20)) + + # Text frame to show the tracking results + self.textbox = tk.Text(bottom_frm, height=10, relief="flat") + self.textbox.pack( + expand=True, side=tk.BOTTOM, fill=tk.X) + self.textbox.insert( + '1.0', + self.tracking_res.to_string(float_format="{:.2f}".format), + ) + + # Highlight the specific row + self.highlight_row(0) + + # Lower the window to prevent flashing + self.root.lower() + + # Plot the first MU pair + self.gui_plot() + + # Bring back the GUI in the foreground + self.root.lift() + self.root.attributes('-topmost', True) + self.root.after_idle(self.root.attributes, '-topmost', False) + + # Start mainloop + self.root.mainloop() + + def highlight_row(self, row_number): + # Highlight a row in self.textbox + + # Define a tag for highlighting + self.textbox.tag_configure("highlight", background="yellow") + + # Update rownumber for the table format + row_number += 2 + + # Calculate the start and end index of the row in the text widget + start_index = f"{row_number}.0" + end_index = f"{row_number}.end" + + # Add the highlight tag to the row + self.textbox.tag_add("highlight", start_index, end_index) + + def gui_plot(self): + # Display the MUAPs and IDR. + + # Update the textbox highlight + pair = int(self.select_mu_pair_cb.get()) + self.textbox.tag_delete("highlight") + self.highlight_row(pair) + + # Update the inclusion_label + if self.tracking_res.loc[pair, "Inclusion"] == "Excluded": + self.inclusion_label.config( + text="EXCLUDED", + foreground="red" + ) + else: + self.inclusion_label.config( + text="INCLUDED", + foreground="green" + ) + + # Get MUs number + mu1 = int(self.tracking_res["MU_file1"].loc[pair]) + mu2 = int(self.tracking_res["MU_file2"].loc[pair]) + + # MUAPs figure + if self.align_muaps: + aligned_sta1, aligned_sta2 = align_by_xcorr( + self.sta_emgfile1[mu1], + self.sta_emgfile2[mu2], + finalduration=0.5, + ) + muaps_fig = plot_muaps( + sta_dict=[aligned_sta1, aligned_sta2], + title="", + figsize=[5, 5], + showimmediately=False, + tight_layout=False, + ) + else: + muaps_fig = plot_muaps( + sta_dict=[self.sta_emgfile1[mu1], self.sta_emgfile2[mu2]], + title="", + figsize=[5, 5], + showimmediately=False, + tight_layout=False, + ) + + # If canvas already exist, destroy it + if hasattr(self, 'muaps_canvas'): + self.muaps_canvas.get_tk_widget().destroy() + + # Place the MUAPs figure in the canvas + self.muaps_canvas = FigureCanvasTkAgg( + muaps_fig, master=self.central_left_frm, + ) + self.muaps_canvas.draw_idle() # Await resizing + self.muaps_canvas.get_tk_widget().pack( + expand=True, fill="both", padx=0, pady=0, + ) + plt.close(muaps_fig) + + # Display IDR mu1 figure + idr_mu1_fig = plot_idr( + emgfile=self.emgfile1, + munumber=mu1, + addrefsig=self.addrefsig, + figsize=[3, 3], + showimmediately=False, + tight_layout=False, + ) + + if hasattr(self, 'idr_mu1_canvas'): + self.idr_mu1_canvas.get_tk_widget().destroy() + + self.idr_mu1_canvas = FigureCanvasTkAgg( + idr_mu1_fig, master=self.central_right_top_frm, + ) + self.idr_mu1_canvas.draw_idle() # Await resizing + self.idr_mu1_canvas.get_tk_widget().pack( + expand=True, fill="both", padx=0, pady=0, + ) + plt.close(idr_mu1_fig) + + # Display IDR mu2 figure + idr_mu2_fig = plot_idr( + emgfile=self.emgfile2, + munumber=mu2, + addrefsig=self.addrefsig, + figsize=[3, 3], + showimmediately=False, + tight_layout=False, + ) # TODO plot orange as MUAPs + + if hasattr(self, 'idr_mu2_canvas'): + self.idr_mu2_canvas.get_tk_widget().destroy() + + self.idr_mu2_canvas = FigureCanvasTkAgg( + idr_mu2_fig, master=self.central_right_bottom_frm, + ) + self.idr_mu2_canvas.draw_idle() # Await resizing + self.idr_mu2_canvas.get_tk_widget().pack( + expand=True, fill="both", padx=0, pady=0, + ) + plt.close(idr_mu2_fig) + + def include_exclude(self): + # Include or exclude the current MU pair + + # Check if the current pair is included or excluded and update label + pair = int(self.select_mu_pair_cb.get()) + if self.tracking_res.loc[pair, "Inclusion"] == "Included": + self.tracking_res.loc[pair, "Inclusion"] = "Excluded" + self.inclusion_label.config( + text="EXCLUDED", + foreground="red" + ) + else: + self.tracking_res.loc[pair, "Inclusion"] = "Included" + self.inclusion_label.config( + text="INCLUDED", + foreground="green" + ) + + # Update table + self.textbox.replace( + '1.0', + 'end', + self.tracking_res.to_string(float_format="{:.2f}".format), + ) + self.textbox.tag_delete("highlight") + self.highlight_row(pair) + + def copy_to_clipboard(self): + # Copy the dataframe to clipboard in csv format. - # Check that the number of plots does not exceed the number of cores - num_cores = os.cpu_count() - if len(tracking_res.index) > num_cores: - # If yes, raise a warning - warnings.warn("\n\nThere are more plots to show than available cores\n") + self.tracking_res.to_clipboard(excel=True, sep=self.csv_separator) - # Parallel execution of plotting - Parallel(n_jobs=-1)(delayed(parallel)(ind) for ind in tracking_res.index) + def get_results(self): + # Get the edited tracking_res - return tracking_res + return self.tracking_res def remove_duplicates_between( @@ -1149,7 +1612,11 @@ def remove_duplicates_between( custom_sorting_order=None, custom_muaps=None, filter=True, + multiprocessing=True, show=False, + gui=True, + gui_addrefsig=True, + gui_csv_separator="\t", which="munumber", ): """ @@ -1233,8 +1700,20 @@ def remove_duplicates_between( filter : bool, default True If true, when the same MU has a match of XCC > threshold with multiple MUs, only the match with the highest XCC is returned. + multiprocessing : bool, default True + If True (default) parallel processing will be used to reduce execution + time. show : bool, default False Whether to plot the STA of pairs of MUs with XCC above threshold. + gui : bool, default True + If True (default) a GUI for the visual inspection and manual selection + of the tracking results will be called. + gui_addrefsig : bool, default True + If True, the REF_SIGNAL is plotted in front of the IDR with a + separated y-axes. This is used only when gui=True. + gui_csv_separator : str, default "\t" + The field delimiter used by the GUI to create the .csv copied to the + clipboard. This is used only when gui=True. which : str {"munumber", "accuracy"}, default "munumber" How to remove the duplicated MUs. @@ -1251,6 +1730,8 @@ def remove_duplicates_between( tracking_res : pd.DataFrame The results of the tracking including the MU from file 1, MU from file 2 and the normalised cross-correlation value (XCC). + If gui=True, an additional column indicating the inclusion/exclusion + of the MUs pairs will also be present in tracking_res. See also -------- @@ -1261,7 +1742,29 @@ def remove_duplicates_between( Examples -------- - Remove duplicated MUs between two OTB files and save the emgfiles + Remove duplicated MUs between two OPENHDEMG files and inspect the tracking + outcome via a convenient GUI. Then Save the emgfiles without duplicates. + Of the 2 duplicated MUs, the one with the lowest accuracy is removed. + + >>> import openhdemg.library as emg + >>> emgfile1 = emg.askopenfile(filesource="OPENHDEMG") + >>> emgfile2 = emg.askopenfile(filesource="OPENHDEMG") + >>> emgfile1, emgfile2, tracking_res = emg.remove_duplicates_between( + ... emgfile1, + ... emgfile2, + ... firings="all", + ... derivation="mono", + ... timewindow=50, + ... threshold=0.9, + ... matrixcode="GR08MM1305", + ... orientation=180, + ... gui=True, + ... which="accuracy", + ... ) + >>> emg.asksavefile(emgfile1) + >>> emg.asksavefile(emgfile2) + + Remove duplicated MUs between two OTB files and directly save the emgfiles without duplicates. The duplicates are removed from the file with more MUs. @@ -1281,16 +1784,18 @@ def remove_duplicates_between( ... n_cols=None, ... filter=True, ... show=False, + ... gui=False, ... which="munumber", ... ) >>> emg.asksavefile(emgfile1) >>> emg.asksavefile(emgfile2) Remove duplicated MUs between two files where channels are sorted with a - custom order and save the emgfiles without duplicates. Of the 2 duplicated - MUs, the one with the lowest accuracy is removed. + custom order and directly save the emgfiles without duplicates. Of the 2 + duplicated MUs, the one with the lowest accuracy is removed. >>> import openhdemg.library as emg + >>> import numpy as np >>> emgfile1 = emg.askopenfile(filesource="CUSTOMCSV") >>> emgfile2 = emg.askopenfile(filesource="CUSTOMCSV") >>> custom_sorting_order = [ @@ -1314,6 +1819,7 @@ def remove_duplicates_between( ... custom_sorting_order=custom_sorting_order, ... filter=True, ... show=False, + ... gui=False, ... which="accuracy", ... ) >>> emg.asksavefile(emgfile1) @@ -1340,14 +1846,27 @@ def remove_duplicates_between( custom_muaps=custom_muaps, exclude_belowthreshold=True, filter=filter, + multiprocessing=multiprocessing, show=show, + gui=gui, + gui_addrefsig=gui_addrefsig, + gui_csv_separator=gui_csv_separator, ) + # If the tracking gui has been used to include/exclude pairs, use the + # included pairs only. + if gui: + tracking_res_cleaned = tracking_res[ + tracking_res["Inclusion"] == "Included" + ] + else: + tracking_res_cleaned = tracking_res + # Identify how to remove MUs if which == "munumber": if emgfile1["NUMBER_OF_MUS"] >= emgfile2["NUMBER_OF_MUS"]: # Remove MUs from emgfile1 - mus_to_remove = list(tracking_res["MU_file1"]) + mus_to_remove = list(tracking_res_cleaned["MU_file1"]) emgfile1 = delete_mus( emgfile=emgfile1, munumber=mus_to_remove, if_single_mu="remove" ) @@ -1356,7 +1875,7 @@ def remove_duplicates_between( else: # Remove MUs from emgfile2 - mus_to_remove = list(tracking_res["MU_file2"]) + mus_to_remove = list(tracking_res_cleaned["MU_file2"]) emgfile2 = delete_mus( emgfile=emgfile2, munumber=mus_to_remove, if_single_mu="remove" @@ -1369,7 +1888,7 @@ def remove_duplicates_between( # on ACCURACY value. to_remove1 = [] to_remove2 = [] - for i, row in tracking_res.iterrows(): + for i, row in tracking_res_cleaned.iterrows(): acc1 = emgfile1["ACCURACY"].loc[int(row["MU_file1"])] acc2 = emgfile2["ACCURACY"].loc[int(row["MU_file2"])] @@ -1535,7 +2054,7 @@ def estimate_cv_via_mle(emgfile, signal): sig = signal.to_numpy() sig = sig.T - # Prepare the input 1D signals for find_teta + # Prepare the input 1D signals for find_mle_teta if np.shape(sig)[0] > 3: sig1 = sig[1, :] sig2 = sig[2, :] @@ -1543,7 +2062,7 @@ def estimate_cv_via_mle(emgfile, signal): sig1 = sig[0, :] sig2 = sig[1, :] - teta = find_teta( + teta = find_mle_teta( sig1=sig1, sig2=sig2, ied=ied, @@ -1562,6 +2081,7 @@ def estimate_cv_via_mle(emgfile, signal): return cv +# TODO add function to return the results class MUcv_gui(): """ Graphical user interface for the estimation of MUs conduction velocity. @@ -1587,6 +2107,10 @@ class MUcv_gui(): The STA is calculated over all the firings. muaps_timewindow : int, default 50 Timewindow to compute ST MUAPs in milliseconds. + figsize : list, default [20, 15] + Size of the initial MUAPs figure in centimeters [width, height]. + csv_separator : str, default "\t" + The field delimiter used to create the .csv copied to the clipboard. See also -------- @@ -1621,6 +2145,7 @@ def __init__( n_firings=[0, 50], muaps_timewindow=50, figsize=[25, 20], + csv_separator="\t", ): # On start, compute the necessary information self.emgfile = emgfile @@ -1633,10 +2158,11 @@ def __init__( ) self.sta_xcc = xcc_sta(self.st) self.figsize = figsize + self.csv_separator = csv_separator # After that, set up the GUI self.root = tk.Tk() - self.root.title('MUs cv estimation') + self.root.title('MUs CV estimation') root_path = os.path.dirname(os.path.abspath(__file__)) iconpath = os.path.join( root_path, @@ -1647,20 +2173,33 @@ def __init__( ) self.root.iconbitmap(iconpath) - # Create main frame, assign structure and minimum spacing - self.frm = ttk.Frame(self.root, padding=15) - # Assign grid structure - self.frm.grid() + # Create outer frames, assign structure and minimum spacing + # Left + left_frm = tk.Frame(self.root, padx=2, pady=2) + left_frm.pack(side=tk.LEFT, expand=True, fill="both") + # Right + right_frm = tk.Frame(self.root, padx=4, pady=4) + right_frm.pack(side=tk.TOP, anchor="nw", expand=True, fill="y") + + # Create inner frames + # Top left + top_left_frm = tk.Frame(left_frm, padx=2, pady=2) + top_left_frm.pack(side=tk.TOP, anchor="nw", fill="x") + # Bottom left + self.bottom_left_frm = tk.Frame(left_frm, padx=2, pady=2) + self.bottom_left_frm.pack( + side=tk.TOP, anchor="nw", expand=True, fill="both", + ) # Label MU number combobox - munumber_label = ttk.Label(self.frm, text="MU number", width=15) + munumber_label = ttk.Label(top_left_frm, text="MU number", width=15) munumber_label.grid(row=0, column=0, columnspan=1, sticky=tk.W) # Create a combobox to change MU self.all_mus = list(range(emgfile["NUMBER_OF_MUS"])) self.selectmu_cb = ttk.Combobox( - self.frm, + top_left_frm, textvariable=tk.StringVar(), values=self.all_mus, state='readonly', @@ -1676,98 +2215,110 @@ def __init__( lambda event: self.gui_plot(), ) - # Add 2 empty columns - emp0 = ttk.Label(self.frm, text="", width=15) - emp0.grid(row=0, column=1, columnspan=1, sticky=tk.W) - emp1 = ttk.Label(self.frm, text="", width=15) - emp1.grid(row=0, column=2, columnspan=1, sticky=tk.W) + # Add empty column + emp = ttk.Label(top_left_frm, text="", width=15) + emp.grid(row=0, column=1, columnspan=1, sticky=tk.W) # Create the widgets to calculate CV # Label and combobox to select the matrix column - col_label = ttk.Label(self.frm, text="Column", width=15) - col_label.grid(row=0, column=3, columnspan=1, sticky=tk.W) + col_label = ttk.Label(top_left_frm, text="Column", width=15) + col_label.grid(row=0, column=2, columnspan=1, sticky=tk.W) self.columns = list(self.st[0].keys()) self.col_cb = ttk.Combobox( - self.frm, + top_left_frm, textvariable=tk.StringVar(), values=self.columns, state='readonly', width=15, ) - self.col_cb.grid(row=1, column=3, columnspan=1, sticky=tk.W) + self.col_cb.grid(row=1, column=2, columnspan=1, sticky=tk.W) self.col_cb.current(0) # Label and combobox to select the matrix channels self.rows = list(range(len(list(self.st[0][self.columns[0]].columns)))) - start_label = ttk.Label(self.frm, text="From row", width=15) - start_label.grid(row=0, column=4, columnspan=1, sticky=tk.W) + start_label = ttk.Label(top_left_frm, text="From row", width=15) + start_label.grid(row=0, column=3, columnspan=1, sticky=tk.W) self.start_cb = ttk.Combobox( - self.frm, + top_left_frm, textvariable=tk.StringVar(), values=self.rows, state='readonly', width=15, ) - self.start_cb.grid(row=1, column=4, columnspan=1, sticky=tk.W) + self.start_cb.grid(row=1, column=3, columnspan=1, sticky=tk.W) self.start_cb.current(0) - self.stop_label = ttk.Label(self.frm, text="To row", width=15) - self.stop_label.grid(row=0, column=5, columnspan=1, sticky=tk.W) + self.stop_label = ttk.Label(top_left_frm, text="To row", width=15) + self.stop_label.grid(row=0, column=4, columnspan=1, sticky=tk.W) self.stop_cb = ttk.Combobox( - self.frm, + top_left_frm, textvariable=tk.StringVar(), values=self.rows, state='readonly', width=15, ) - self.stop_cb.grid(row=1, column=5, columnspan=1, sticky=tk.W) + self.stop_cb.grid(row=1, column=4, columnspan=1, sticky=tk.W) self.stop_cb.current(max(self.rows)) # Button to start CV estimation self.ied = emgfile["IED"] self.fsamp = emgfile["FSAMP"] button_est = ttk.Button( - self.frm, + top_left_frm, text="Estimate", command=self.compute_cv, width=15, ) - button_est.grid(row=1, column=6, columnspan=1, sticky="we") + button_est.grid(row=1, column=5, columnspan=1, sticky="we") - # Add empty column - self.emp2 = ttk.Label(self.frm, text="", width=5) - self.emp2.grid(row=0, column=7, columnspan=1, sticky=tk.W) + # Configure column weights + for c in range(6): + if c == 1: + top_left_frm.columnconfigure(c, weight=20) + else: + top_left_frm.columnconfigure(c, weight=1) - # Add text frame to show the results (only CV and RMS) - self.res_df = pd.DataFrame( - data=0, - index=self.all_mus, - columns=["CV", "RMS", "XCC", "Column", "From_Row", "To_Row"], - ) - self.textbox = tk.Text(self.frm, width=20) - self.textbox.grid(row=2, column=8, sticky="ns") - self.textbox.insert('1.0', self.res_df.loc[:, ["CV", "RMS"]].to_string()) + # Align the right_frm + alignment_label = ttk.Label(right_frm, text="", width=15) + alignment_label.pack(side=tk.TOP, fill="x") # Create a button to copy the dataframe to clipboard copy_btn = ttk.Button( - self.frm, + right_frm, text="Copy results", command=self.copy_to_clipboard, - width=20, + width=15, + ) + copy_btn.pack(side=tk.TOP, fill="x", pady=(0, 5)) + + # Add text frame to show the results (only CV and RMS) + self.res_df = pd.DataFrame( + data=0.00, + index=self.all_mus, + columns=["CV", "RMS", "XCC", "Column", "From_Row", "To_Row"], + ) + self.textbox = tk.Text(right_frm, width=25) + self.textbox.pack(side=tk.TOP, expand=True, fill="y") + self.textbox.insert( + '1.0', + self.res_df.loc[:, ["CV", "RMS", "XCC"]].to_string( + float_format="{:.2f}".format + ), ) - copy_btn.grid(row=1, column=8, columnspan=1, sticky="we") # Plot MU 0 while opening the GUI, # this will move the GUI in the background ??. self.gui_plot() - # Bring back the GUI in in the foreground + # Bring back the GUI in the foreground self.root.lift() + self.root.attributes('-topmost', True) + self.root.after_idle(self.root.attributes, '-topmost', False) # Start the main loop self.root.mainloop() @@ -1788,16 +2339,22 @@ def gui_plot(self): figsize=self.figsize, ) + # If canvas already exists, destroy it + if hasattr(self, 'canvas'): + self.canvas.get_tk_widget().destroy() + # Place the figure in the GUI - canvas = FigureCanvasTkAgg(fig, master=self.frm) - canvas.draw() - canvas.get_tk_widget().grid(row=2, column=0, columnspan=7, sticky="we") + self.canvas = FigureCanvasTkAgg(fig, master=self.bottom_left_frm) + self.canvas.draw_idle() # Await resizing + self.canvas.get_tk_widget().pack( + expand=True, fill="both", padx=0, pady=0, + ) plt.close() def copy_to_clipboard(self): # Copy the dataframe to clipboard in csv format. - pyperclip.copy(self.res_df.to_csv(index=False, sep='\t')) + self.res_df.to_clipboard(excel=True, sep=self.csv_separator) # Define functions for cv estimation def compute_cv(self): @@ -1839,5 +2396,7 @@ def compute_cv(self): self.textbox.replace( '1.0', 'end', - self.res_df.loc[:, ["CV", "RMS"]].round(3).to_string(), + self.res_df.loc[:, ["CV", "RMS", "XCC"]].to_string( + float_format="{:.2f}".format + ), ) diff --git a/openhdemg/library/pic.py b/openhdemg/library/pic.py new file mode 100644 index 0000000..adf74f4 --- /dev/null +++ b/openhdemg/library/pic.py @@ -0,0 +1,321 @@ +""" +This module contains all the functions used to quantify and analyze MU +persistent inward currents. + +Currently includes delta F. +""" + +import pandas as pd +import numpy as np +from itertools import combinations + + +def compute_deltaf( + emgfile, + smoothfits, + average_method="test_unit_average", + normalisation="False", + recruitment_difference_cutoff=1.0, + corr_cutoff=0.7, + controlunitmodulation_cutoff=0.5, + clean=True, +): + """ + Quantify delta F via paired motor unit analysis. + + Conducts a paired motor unit analysis, quantifying delta F between the + supplied collection of motor units. Origional framework for deltaF provided + in Gorassini et. al., 2002: + https://journals.physiology.org/doi/full/10.1152/jn.00024.2001 + + Author: James (Drew) Beauchamp + + Parameters + ---------- + emgfile : dict + The dictionary containing the emgfile. + smoothfits : list of arrays + Smoothed discharge rate estimates. + Each array: motor unit discharge rate x samples aligned in time; + instances of non-firing = NaN + Your choice of smoothing. See compute_svr gen_svr for example. + average_method : str {"test_unit_average", "all"}, default "test_unit_average" + The method for test MU deltaF value. More to be added. + + ``test_unit_average`` + The average across all possible control units. + + ``all`` + This returns all possible MU pairs + normalisation : str {"False", "ctrl_max_desc"}, default "False" + The method for deltaF nomalization. + + ``ctrl_max_desc`` + Whether to normalise deltaF values to control unit descending + range during test unit firing. See Skarabot et. al., 2023: + https://www.biorxiv.org/content/10.1101/2023.10.16.562612v1 + recruitment_difference_cutoff : float, default 1 + An exlusion criteria corresponding to the necessary difference between + control and test MU recruitement in seconds. + corr_cutoff : float (0 to 1), default 0.7 + An exclusion criteria corresponding to the correlation between control + and test unit discharge rate. + controlunitmodulation_cutoff : float, default 0.5 + An exclusion criteria corresponding to the necessary modulation of + control unit discharge rate during test unit firing in Hz. + clean : bool, default True + To remove values that do not meet exclusion criteria + + Returns + ------- + delta_f : pd.DataFrame + A pd.DataFrame containing deltaF values and corresponding MU number. + The resulting df will be different depending on average_method. + In particular, if average_method="all", delta_f[MU][row] will + contain a tuple representing the indices of the two motor units + for each given pair (reporter, test) and their corresponding + deltaF value. + + See also + -------- + - compute_svr : fit MU discharge rates with Support Vector Regression, + nonlinear regression. + + Examples + -------- + Quantify delta F using svr fits. + + >>> import openhdemg.library as emg + >>> emgfile = emg.emg_from_samplefile() + >>> emgfile = emg.sort_mus(emgfile=emgfile) + >>> svrfits = emg.compute_svr(emgfile) + >>> delta_f = emg.compute_deltaf( + ... emgfile=emgfile, smoothfits=svrfits["gensvr"], + ... ) + delta_f + MU dF + 0 0 NaN + 1 1 NaN + 2 2 NaN + 3 3 1.838382 + 4 4 2.709522 + + For all possible combinations, not test unit average, MU in this case is + pairs (reporter, test). + + >>> delta_f_2 = emg.compute_deltaf( + ... emgfile=emgfile, smoothfits=svrfits["gensvr"], average_method='all', + ... ) + delta_f_2 + MU dF + 0 (0, 1) NaN + 1 (0, 2) NaN + 2 (0, 3) 2.127461 + 3 (0, 4) NaN + 4 (1, 2) NaN + 5 (1, 3) 1.549303 + 6 (1, 4) NaN + 7 (2, 3) NaN + 8 (2, 4) NaN + 9 (3, 4) 2.709522 + """ + + dfret_ret = [] + mucombo_ret = np.empty(0, int) + + # If less than 2 MUs, can not quantify deltaF + if emgfile["NUMBER_OF_MUS"] < 2: + dfret_ret = np.nan + mucombo_ret = np.nan*np.ones([1, 2]) + + delta_f = pd.DataFrame({'MU': mucombo_ret, 'dF': dfret_ret}) + + return delta_f + + # If more than 2 MUs, quantify deltaF. + # Combinations of MUs. + combs = combinations(range(emgfile["NUMBER_OF_MUS"]), 2) + # TODO if units are nonconsecutive + + # init + r_ret = [] + dfret = [] + testmu = [] + ctrl_mod = [] + mucombo = [] + rcrt_diff = [] + controlmu = [] + for mucomb in list(combs): # For all possible combinations of MUs + # Extract possible MU combinations (a unique MU pair) + mu1_id, mu2_id = mucomb[0], mucomb[1] + # Track current MU combination + mucombo.append((mu1_id, mu2_id)) + + # First MU firings, recruitment, and decrecruitment + mu1_times = np.where(emgfile["BINARY_MUS_FIRING"][mu1_id] == 1)[0] + mu1_rcrt, mu1_drcrt = mu1_times[1], mu1_times[-1] + # Skip first since idr is defined on second + + # Second MU firings, recruitment, and decrecruitment + mu2_times = np.where(emgfile["BINARY_MUS_FIRING"][mu2_id] == 1)[0] + mu2_rcrt, mu2_drcrt = mu2_times[1], mu2_times[-1] + # Skip first since idr is defined on second + + # Region of MU overlap + muoverlap = range( + max(mu1_rcrt, mu2_rcrt), min(mu1_drcrt, mu2_drcrt), + ) + + # If MUs do not overlapt by more than two or more samples + if len(muoverlap) < 2: + dfret = np.append(dfret, np.nan) + r_ret = np.append(r_ret, np.nan) + rcrt_diff = np.append(rcrt_diff, np.nan) + ctrl_mod = np.append(ctrl_mod, np.nan) + continue # TODO test + + # Corr between units - not always necessary, can be set to 0 when + # desired. + r = pd.DataFrame( + zip( + smoothfits[mu1_id][muoverlap], + smoothfits[mu2_id][muoverlap], + ) + ).corr() + r_ret = np.append(r_ret, r[0][1]) + + # Recruitment diff, necessary to ensure PICs are activated in + # control unit. + rcrt_diff = np.append( + rcrt_diff, np.abs(mu1_rcrt-mu2_rcrt)/emgfile["FSAMP"], + ) + if mu1_rcrt < mu2_rcrt: + controlU = 1 # MU 1 is control unit, 2 is test unit + + # If control (reporter) unit is not on for entirety of test + # unit, set last firing to control unit. + if mu1_drcrt < mu2_drcrt: + mu2_drcrt = mu1_drcrt + # This may understimate PICs, other methods can be employed + # delta F: change in control MU discharge rate between test + # unit recruitment and derecruitment. + df = smoothfits[mu1_id][mu2_rcrt]-smoothfits[mu1_id][mu2_drcrt] + + # Control unit discharge rate modulation while test unit is + # firing. + ctrl_mod = np.append( + ctrl_mod, + np.nanmax(smoothfits[mu1_id][range(mu2_rcrt, mu2_drcrt)]) + - np.nanmin(smoothfits[mu1_id][range(mu2_rcrt, mu2_drcrt)]), + ) + + if normalisation == "False": + dfret = np.append(dfret, df) + elif normalisation == "ctrl_max_desc": + # Normalise deltaF values to control unit descending range + # during test unit firing. + k = smoothfits[mu1_id][mu2_rcrt]-smoothfits[mu1_id][mu1_drcrt] + dfret = np.append(dfret, df/k) + + elif mu1_rcrt > mu2_rcrt: + controlU = 2 # MU 2 is control unit, 1 is test unit + if mu1_drcrt > mu2_drcrt: + # If control (reporter) unit is not on for entirety of + # test unit, set last firing to control unit. + mu1_drcrt = mu2_drcrt + # This may understimate PICs, other methods can be employed. + # delta F: change in control MU discharge rate between test + # unit recruitment and derecruitment. + df = smoothfits[mu2_id][mu1_rcrt]-smoothfits[mu2_id][mu1_drcrt] + + # Control unit discharge rate modulation while test unit is + # firing. + ctrl_mod = np.append( + ctrl_mod, + np.nanmax(smoothfits[mu2_id][range(mu1_rcrt, mu1_drcrt)]) + - np.nanmin(smoothfits[mu2_id][range(mu1_rcrt, mu1_drcrt)]), + ) + + if normalisation == "False": + dfret = np.append(dfret, df) + elif normalisation == "ctrl_max_desc": + # Normalise deltaF values to control unit descending range + # during test unit firing. + k = smoothfits[mu2_id][mu1_rcrt]-smoothfits[mu2_id][mu2_drcrt] + dfret = np.append(dfret, df/k) + + elif mu1_rcrt == mu2_rcrt: + if mu1_drcrt > mu2_drcrt: + controlU = 1 # MU 1 is control unit, 2 is test unit + # delta F: change in control MU discharge rate between + # test unit recruitment and derecruitment. + df = smoothfits[mu1_id][mu2_rcrt]-smoothfits[mu1_id][mu2_drcrt] + + # Control unit discharge rate modulation while test unit is + # firing. + ctrl_mod = np.append( + ctrl_mod, + np.nanmax(smoothfits[mu1_id][range(mu2_rcrt, mu2_drcrt)]) + - np.nanmin(smoothfits[mu1_id][range(mu2_rcrt, mu2_drcrt)]), + ) + + if normalisation == "False": + dfret = np.append(dfret, df) + elif normalisation == "ctrl_max_desc": + # Normalise deltaF values to control unit descending + # range during test unit firing. + k = smoothfits[mu1_id][mu2_rcrt]-smoothfits[mu1_id][mu1_drcrt] + dfret = np.append(dfret, df/k) + else: + controlU = 2 # MU 2 is control unit, 1 is test unit + # delta F: change in control MU discharge rate between + # test unit recruitment and derecruitment. + df = smoothfits[mu2_id][mu1_rcrt]-smoothfits[mu2_id][mu1_drcrt] + + # Control unit discharge rate modulation while test unit is + # firing. + ctrl_mod = np.append( + ctrl_mod, + np.nanmax(smoothfits[mu2_id][range(mu1_rcrt, mu1_drcrt)]) + - np.nanmin(smoothfits[mu2_id][range(mu1_rcrt, mu1_drcrt)]), + ) + + if normalisation == "False": + dfret = np.append(dfret, df) + elif normalisation == "ctrl_max_desc": + # Normalise deltaF values to control unit descending + # range during test unit firing. + k = smoothfits[mu2_id][mu1_rcrt]-smoothfits[mu2_id][mu2_drcrt] + dfret = np.append(dfret, df/k) + + # Collect which MUs were control vs test + controlmu.append(mucombo[-1][controlU-1]) + testmu.append(mucombo[-1][1-controlU//2]) + + if clean: # Remove values that dont meet exclusion criteria + rcrt_diff_bin = rcrt_diff > recruitment_difference_cutoff + corr_bin = r_ret > corr_cutoff + ctrl_mod_bin = ctrl_mod > controlunitmodulation_cutoff + clns = np.asarray([rcrt_diff_bin & corr_bin & ctrl_mod_bin]) + dfret[~clns[0]] = np.nan + + if average_method == "test_unit_average": + # Average across all control units + for ii in range(emgfile["NUMBER_OF_MUS"]): + clean_indices = [ + index for (index, item) in enumerate(testmu) if item == ii + ] + if np.isnan(dfret[clean_indices]).all(): + dfret_ret = np.append(dfret_ret, np.nan) + else: + dfret_ret = np.append( + dfret_ret, np.nanmean(dfret[clean_indices]), + ) + mucombo_ret = np.append(mucombo_ret, int(ii)) + else: # Return all values and corresponding combinations + dfret_ret = dfret + mucombo_ret = mucombo + + delta_f = pd.DataFrame({'MU': mucombo_ret, 'dF': dfret_ret}) + + return delta_f diff --git a/openhdemg/library/plotemg.py b/openhdemg/library/plotemg.py index b8d1219..fedc1b9 100644 --- a/openhdemg/library/plotemg.py +++ b/openhdemg/library/plotemg.py @@ -6,6 +6,7 @@ import matplotlib.pyplot as plt import seaborn as sns import pandas as pd +import numpy as np from openhdemg.library.tools import compute_idr from openhdemg.library.mathtools import min_max_scaling @@ -148,6 +149,7 @@ def plot_emgsig( for count, thisChannel in enumerate(channels): # Normalise the series norm_raw = min_max_scaling(emgfile["RAW_SIGNAL"][thisChannel]) + # TODO option to scale all the df or the single channel # Add 1 to the previous channel to avoid overlapping norm_raw = norm_raw + count plt.plot(x_axis, norm_raw) @@ -851,8 +853,6 @@ def plot_idr( ) # Check if we have a single MU or a list of MUs to plot. - # In the first case, the use of plt.figure has been preferred to - # plt.subplots for the implementation of the MUs cleaning. if isinstance(munumber, int): ax1 = sns.scatterplot( x=idr[munumber]["timesec" if timeinseconds else "mupulses"], @@ -885,6 +885,8 @@ def plot_idr( ax1.set_xlabel("Time (Sec)" if timeinseconds else "Samples") elif len(munumber) == 1: + # TODO remove this part and check len before. + # See plot_smoothed_dr as a reference ax1 = sns.scatterplot( x=idr[munumber[0]]["timesec" if timeinseconds else "mupulses"], y=idr[munumber[0]]["idr"], @@ -918,11 +920,252 @@ def plot_idr( return fig +def plot_smoothed_dr( + emgfile, + smoothfits, + munumber="all", + addidr=True, + stack=True, + addrefsig=True, + timeinseconds=True, + figsize=[20, 15], + showimmediately=True, + tight_layout=True, +): + """ + Plot the smoothed discharge rate. + + Parameters + ---------- + emgfile : dict + The dictionary containing the emgfile. + smoothfits : pd.DataFrame + Smoothed discharge rate estimates aligned in time. Columns should + contain the smoothed discharge rate for each MU. + munumber : str, int or list, default "all" + + ``all`` + IDR of all the MUs is plotted. + + Otherwise, a single MU (int) or multiple MUs (list of int) can be + specified. + The list can be passed as a manually-written list or with: + munumber=[*range(0, 12)]. + We need the " * " operator to unpack the results of range into a list. + munumber is expected to be with base 0 (i.e., the first MU in the file + is the number 0). + addidr : bool, default True + Whether to show also the IDR behind the smoothed DR line. + stack : bool, default True + Whether to stack multiple MUs. If False, all the MUs smoothed DR will + be plotted on the same line. + addrefsig : bool, default True + If True, the REF_SIGNAL is plotted in front of the MUs IDR with a + separated y-axes. + timeinseconds : bool, default True + Whether to show the time on the x-axes in seconds (True) + or in samples (False). + figsize : list, default [20, 15] + Size of the figure in centimeters [width, height]. + showimmediately : bool, default True + If True (default), plt.show() is called and the figure showed to the + user. + It is useful to set it to False when calling the function from the GUI. + tight_layout : bool, default True + If True (default), the plt.tight_layout() is called and the figure's + layout is improved. + It is useful to set it to False when calling the function from the GUI. + + Returns + ------- + fig : pyplot `~.figure.Figure` + + See also + -------- + - compute_svr : Fit MU discharge rates with Support Vector Regression, + nonlinear regression. + + Notes + ----- + munumber = "all" corresponds to: + munumber = [*range(0, emgfile["NUMBER_OF_MUS"])] + + Examples + -------- + Smooth MUs DR using Support Vector Regression. + + >>> import openhdemg.library as emg + >>> import pandas as pd + >>> emgfile = emg.emg_from_samplefile() + >>> emgfile = emg.sort_mus(emgfile=emgfile) + >>> svrfits = emg.compute_svr(emgfile) + + Plot the stacked smoothed DR of all the MUs and overlay the IDR and the + reference signal. + + >>> smoothfits = pd.DataFrame(svrfits["gensvr"]).transpose() + >>> emg.plot_smoothed_dr( + >>> emgfile, + >>> smoothfits=smoothfits, + >>> munumber="all", + >>> addidr=True, + >>> stack=True, + >>> addrefsig=True, + >>> ) + """ + + # Compute the IDR + idr = compute_idr(emgfile=emgfile) + + if addrefsig: + if not isinstance(emgfile["REF_SIGNAL"], pd.DataFrame): + raise TypeError( + "REF_SIGNAL is probably absent or it is not contained in a dataframe" + ) + + # Check if all the MUs have to be plotted + if isinstance(munumber, list): + if len(munumber) == 1: # Manage exception of single MU + munumber = munumber[0] + elif isinstance(munumber, str): + if emgfile["NUMBER_OF_MUS"] == 1: # Manage exception of single MU + munumber = 0 + else: + munumber = [*range(0, emgfile["NUMBER_OF_MUS"])] + + # Check smoothfits type + if not isinstance(smoothfits, pd.DataFrame): + raise TypeError("smoothfits must be a pd.DataFrame") + + # Use the subplot function to allow for the use of twinx() + fig, ax1 = plt.subplots( + figsize=(figsize[0] / 2.54, figsize[1] / 2.54), + num="Smoothed DR", + ) + + # Check if we have a single MU or a list of MUs to plot. + if isinstance(munumber, int): + # Plot IDR + if addidr: + sns.scatterplot( + x=idr[munumber]["timesec" if timeinseconds else "mupulses"], + y=idr[munumber]["idr"], + ax=ax1, + alpha=0.4, + ) # sns.scatterplot breaks if x or y are nan? + + # Plot smoothed DR + if timeinseconds: + x_smooth = np.arange(len(smoothfits[munumber])) / emgfile["FSAMP"] + else: + x_smooth = np.arange(len(smoothfits[munumber])) + ax1.plot(x_smooth, smoothfits[munumber], linewidth=2) + + # Labels + ax1.set_ylabel( + "MU {} (PPS)".format(munumber) + ) # Useful because if the MU is empty it won't show the channel number + ax1.set_xlabel("Time (Sec)" if timeinseconds else "Samples") + + elif isinstance(munumber, list) and stack: + for count, thisMU in enumerate(munumber): + # Plot IDR + if addidr: + # Normalise the IDR series and add 1 to the previous MUs to + # avoid overlapping of the MUs. + norm_idr = min_max_scaling(idr[thisMU]["idr"]) + count + sns.scatterplot( + x=idr[thisMU]["timesec" if timeinseconds else "mupulses"], + y=norm_idr, + ax=ax1, + alpha=0.4, + ) + + # Plot smoothed DR + if timeinseconds: + x_smooth = np.arange( + len(smoothfits[thisMU]) + ) / emgfile["FSAMP"] + else: + x_smooth = np.arange(len(smoothfits[thisMU])) + + if addidr: + # Scale factor to match smoothed DR and IDR when stacked + delta = ( + (smoothfits[thisMU].max() - smoothfits[thisMU].min()) / + (idr[thisMU]["idr"].max() - idr[thisMU]["idr"].min()) + ) + delta_min = ( + (smoothfits[thisMU].min() - idr[thisMU]["idr"].min()) / + (idr[thisMU]["idr"].max() - idr[thisMU]["idr"].min()) + ) + norm_fit = min_max_scaling(smoothfits[thisMU]) + norm_fit = norm_fit * delta + delta_min + count + ax1.plot(x_smooth, norm_fit, linewidth=2) + else: + norm_fit = min_max_scaling(smoothfits[thisMU]) + count + ax1.plot(x_smooth, norm_fit, linewidth=2) + + # Ensure correct and complete ticks on the left y axis + ax1.set_yticks([*range(len(munumber))]) + ax1.set_yticklabels([str(mu) for mu in munumber]) + # Set axes labels + ax1.set_ylabel("Motor units") + ax1.set_xlabel("Time (Sec)" if timeinseconds else "Samples") + + elif isinstance(munumber, list) and not stack: + for count, thisMU in enumerate(munumber): + # Plot IDR + if addidr: + sns.scatterplot( + x=idr[thisMU]["timesec" if timeinseconds else "mupulses"], + y=idr[thisMU]["idr"], + ax=ax1, + alpha=0.4, + ) + + # Plot smoothed DR + if timeinseconds: + x_smooth = np.arange( + len(smoothfits[thisMU]) + ) / emgfile["FSAMP"] + else: + x_smooth = np.arange(len(smoothfits[thisMU])) + ax1.plot(x_smooth, smoothfits[thisMU], linewidth=2) + + # Set axes labels + ax1.set_ylabel("Discharge rate (PPS)") + ax1.set_xlabel("Time (Sec)" if timeinseconds else "Samples") + + else: + raise TypeError( + "While calling the plot_idr function, you should pass an integer, a list or 'all' to munumber" + ) + + if addrefsig: + ax2 = ax1.twinx() + # Plot the ref signal + xref = ( + emgfile["REF_SIGNAL"].index / emgfile["FSAMP"] + if timeinseconds + else emgfile["REF_SIGNAL"].index + ) + sns.lineplot(y=emgfile["REF_SIGNAL"][0], x=xref, color="0.4", ax=ax2) + ax2.set_ylabel("MVC") + + showgoodlayout(tight_layout, despined="2yaxes" if addrefsig else False) + if showimmediately: + plt.show() + + return fig + + def plot_muaps( sta_dict, title="MUAPs from STA", figsize=[20, 15], showimmediately=True, + tight_layout=False, ): """ Plot MUAPs obtained from STA from one or multiple MUs. @@ -942,6 +1185,11 @@ def plot_muaps( If True (default), plt.show() is called and the figure showed to the user. It is useful to set it to False when calling the function from the GUI. + tight_layout : bool, default False + If True (default), the plt.tight_layout() is called and the figure's + layout is improved. + It is useful to set it to False when calling the function from the GUI + or if the final layout is not correct. Returns ------- @@ -1088,7 +1336,7 @@ def plot_muaps( "Unacceptable number of rows and columns to plot" ) - showgoodlayout(tight_layout=False, despined=True) + showgoodlayout(tight_layout=tight_layout, despined=True) if showimmediately: plt.show() @@ -1328,6 +1576,7 @@ def plot_muaps_for_cv( title="MUAPs for CV", figsize=[20, 15], showimmediately=True, + tight_layout=False, ): """ Visualise MUAPs on which to calculate MUs CV. @@ -1351,6 +1600,11 @@ def plot_muaps_for_cv( If True (default), plt.show() is called and the figure showed to the user. It is useful to set it to False when calling the function from the GUI. + tight_layout : bool, default False + If True (default), the plt.tight_layout() is called and the figure's + layout is improved. + It is useful to set it to False when calling the function from the GUI + or if the final layout is not correct. Returns ------- diff --git a/openhdemg/library/tools.py b/openhdemg/library/tools.py index 541dc43..0a5a92d 100644 --- a/openhdemg/library/tools.py +++ b/openhdemg/library/tools.py @@ -10,6 +10,8 @@ import numpy as np import matplotlib.pyplot as plt from scipy import signal +from scipy.stats import iqr +from sklearn.svm import SVR import warnings from openhdemg.library.mathtools import compute_sil @@ -144,7 +146,7 @@ def create_binary_firings(emg_length, number_of_mus, mupulses): if not isinstance(mupulses, list): raise ValueError("mupulses is not a list of ndarrays") - # Initialize a pd.DataFrame with zeros + # Initialise a pd.DataFrame with zeros binary_MUs_firing = pd.DataFrame( np.zeros((emg_length, number_of_mus), dtype=int) ) @@ -292,8 +294,8 @@ def resize_emgfile( # Double check that start_, end_ are within the real range. if start_ < 0: start_ = 0 - if end_ > emgfile["REF_SIGNAL"].shape[0]: - end_ = emgfile["REF_SIGNAL"].shape[0] + if end_ > emgfile["RAW_SIGNAL"].shape[0]: + end_ = emgfile["RAW_SIGNAL"].shape[0] # Create the object to store the resized emgfile. rs_emgfile = copy.deepcopy(emgfile) @@ -1195,3 +1197,203 @@ def compute_rfd( rfd = rfd * conversion_val return rfd + + +def compute_svr( + emgfile, + gammain=1/1.6, + regparam=1/0.370, + endpointweights_numpulses=5, + endpointweights_magnitude=5, + discontfiring_dur=1.0, +): + """ + Fit MU discharge rates with Support Vector Regression, nonlinear + regression. + + Provides smooth and continous estimates of discharge rate useful for + quantification and visualisation. Suggested hyperparameters and framework + from Beauchamp et. al., 2022 + https://doi.org/10.1088/1741-2552/ac4594 + + Author: James (Drew) Beauchamp + + Parameters + ---------- + emgfile : dict + The dictionary containing the emgfile. + gammain : float, default 1/1.6 + The kernel coefficient. + regparam : float, default 1/0.370 + The regularization parameter, must be positive. + endpointweights_numpulses : int, default 5 + Number of discharge instances at the start and end of MU firing to + apply a weighting coefficient. + endpointweights_magnitude : int, default 5 + The scaling factor applied to the number of pulses provided by + endpointweights_numpulses. + The scaling is applied to the regularization parameter, per sample. + Larger values force the classifier to put more emphasis on the number + of discharge instances at the start and end of firing provided by + endpointweights_numpulses. + discontfiring_dur : int, default 1 + Duration of time in seconds that defines an instnance of discontinuous + firing. SVR fits will not be returned at points of discontinuity. + + Returns + ------- + svrfits : pd.DataFrame + A pd.DataFrame containing the smooth/continous MU discharge rates and + corresponding time vectors. + + See also + -------- + - compute_deltaf : quantify delta F via paired motor unit analysis. + + Examples + -------- + Quantify svr fits. + + >>> import openhdemg.library as emg + >>> import pandas as pd + >>> emgfile = emg.emg_from_samplefile() + >>> emgfile = emg.sort_mus(emgfile=emgfile) + >>> svrfits = emg.compute_svr(emgfile) + + Quick plot showing the results. + + >>> smoothfits = pd.DataFrame(svrfits["gensvr"]).transpose() + >>> emg.plot_smoothed_dr( + >>> emgfile, + >>> smoothfits=smoothfits, + >>> munumber="all", + >>> addidr=False, + >>> stack=True, + >>> addrefsig=True, + >>> ) + """ + + # TODO input checking and edge cases + idr = compute_idr(emgfile) # Calc IDR + + svrfit_acm = [] + svrtime_acm = [] + gensvr_acm = [] + for mu in range(len(idr)): # For all MUs + # Train the model on the data. + # Time vector, removing first element. + xtmp = np.transpose([idr[mu].timesec[1:]]) + # Discharge rates, removing first element, since DR has been assigned + # to second pulse. + ytmp = idr[mu].idr[1:].to_numpy() + # Time between discharges, will use for discontinuity calc + xdiff = idr[mu].diff_mupulses[1:].values + # Motor unit pulses, samples + mup = np.array(idr[mu].mupulses) + + # Defining weight vector. A scaling applied to the regularization + # parameter, per sample. + smpwht = np.ones(len(ytmp)) + smpwht[0:endpointweights_numpulses-1] = endpointweights_magnitude + smpwht[(len(ytmp)-(endpointweights_numpulses-1)):len(ytmp)] = endpointweights_magnitude + + # Create an SVR model with a gausian kernel and supplied hyperparams. + # Origional hyperparameters from Beauchamp et. al., 2022: + # https://doi.org/10.1088/1741-2552/ac4594 + svr = SVR( + kernel='rbf', gamma=gammain, C=np.abs(regparam), + epsilon=iqr(ytmp)/11, + ) + svr.fit(xtmp, ytmp, sample_weight=smpwht) + + # Defining prediction vector + # TODO need to add custom range. + # From the second firing to the end of firing, in samples. + predind = np.arange(mup[1], mup[-1]+1) + predtime = (predind/emgfile["FSAMP"]).reshape(-1, 1) # In time (s) + # Initialise nan vector for tracking fits aligned in time. Usefull for + # later quant metrics. + gen_svr = np.nan*np.ones(emgfile["EMG_LENGTH"]) + + # Check for discontinous firing + bkpnt = mup[ + np.where((xdiff > (discontfiring_dur * emgfile["FSAMP"])))[0] + ] + bkpnt = bkpnt[np.where(bkpnt != mup[-1])] + + # Make predictions on the data + if bkpnt.size > 0: # If there is a point of discontinuity + if bkpnt[0] == mup[0]: # When first firing is discontinuity + smoothfit = np.nan*np.ones(1) + newtm = np.nan*np.ones(1) + bkpnt = bkpnt[1:] + else: + tmptm = predtime[ + 0: np.where( + (bkpnt[0] >= predind[0:-1]) & (bkpnt[0] < predind[1:]) + )[0][0] + ] # Break up time vector for first continous range of firing + smoothfit = svr.predict(tmptm) # Predict with svr model + newtm = tmptm # Track new time vector + + tmpind = predind[ + 0: np.where( + (bkpnt[0] >= predind[0:-1]) & (bkpnt[0] < predind[1:]) + )[0][0] + ] # Sample vector of first continous range of firing + # Fill corresponding sample indices with svr fit + gen_svr[tmpind.astype(np.int64)] = smoothfit + # Add last firing as discontinuity + bkpnt = np.append(bkpnt, mup[-1]) + for ii in range(bkpnt.size-1): # All instances of discontinuity + curind = np.where( + (bkpnt[ii] >= predind[0:-1]) & (bkpnt[ii] < predind[1:]) + )[0][0] # Current index of discontinuity + nextind = np.where( + (bkpnt[ii+1] >= predind[0:-1]) & (bkpnt[ii+1] <= predind[1:]) + )[0][0] # Next index of discontinuity + + # MU firing before discontinuity + curmup = np.where(mup == bkpnt[ii])[0][0] + curind_nmup = np.where( + (mup[curmup+1] >= predind[0:-1]) & (mup[curmup+1] <= predind[1:]) + )[0][0] # MU firing after discontinuity + + # If the next discontinuity is the next MU firing, nan fill + if curind_nmup >= nextind: + # Edge case NEED TO CHECK THE GREATER THAN CASE>> WHY TODO + smoothfit = np.append(smoothfit, np.nan*np.ones(1)) + newtm = np.append(newtm, np.nan*np.ones(1)) + else: # Fit next continuous region of firing + smoothfit = np.append( + smoothfit, + np.nan*np.ones(len(predtime[curind:curind_nmup])-2), + ) + smoothfit = np.append( + smoothfit, svr.predict(predtime[curind_nmup:nextind]), + ) + newtm = np.append( + newtm, + np.nan*np.ones(len(predtime[curind:curind_nmup])-2), + ) + newtm = np.append(newtm, predtime[curind_nmup:nextind]) + gen_svr[predind[curind_nmup:nextind]] = svr.predict( + predtime[curind_nmup:nextind] + ) + else: + smoothfit = svr.predict(predtime) + newtm = predtime + gen_svr[predind] = smoothfit + + # Append fits, new time vect, time aligned fits + svrfit_acm.append(smoothfit.copy()) + svrtime_acm.append(newtm.copy()) + gensvr_acm.append(gen_svr.copy()) + + svrfits = { + "svrfit": svrfit_acm, + "svrtime": svrtime_acm, + "gensvr": gensvr_acm, + } + + return svrfits diff --git a/openhdemg/tests/unit/test_anlysis.py b/openhdemg/tests/unit/test_anlysis.py new file mode 100644 index 0000000..4c7bd62 --- /dev/null +++ b/openhdemg/tests/unit/test_anlysis.py @@ -0,0 +1,468 @@ +""" +To run the tests using unittest, execute from the openhdemg/tests directory: + python -m unittest discover + +First, you should dowload all the files necessary for the testing and store +them inside openhdemg/tests/fixtures. The files are available at: +https://drive.google.com/drive/folders/1suCZSils8rSCs2E3_K25vRCbN3AFDI7F?usp=sharing + +IMPORTANT: Do not alter the content of the dowloaded folder! + +WARNING!!! Since the library's functions perform complex tasks and return +complex data structures, these tests can verify that no critical errors occur, +but the accuracy of each function must be assessed independently upon creation, +or at each revision of the code. + +WARNING!!! - UNTESTED FUNCTIONS: none +""" + + +import unittest +from openhdemg.tests.unit.functions_for_unit_test import get_directories as getd + +from openhdemg.library.openfiles import emg_from_samplefile + +from openhdemg.library.analysis import ( + compute_thresholds, compute_dr, basic_mus_properties, compute_covisi, + compute_drvariability, +) + +import numpy as np + + +class TestAnalysis(unittest.TestCase): + """ + Test the functions/classes in the analysis module. + """ + + def test_compute_thresholds(self): + """ + Test the compute_thresholds function with the samplefile. + """ + + # Load the decomposed samplefile + emgfile = emg_from_samplefile() + + # Default parameters + res = compute_thresholds( + emgfile=emgfile, + event_="rt_dert", + type_="abs_rel", + n_firings=1, + mvc=1234, + ) + + self.assertAlmostEqual( + res["abs_RT"][0], 86.824, places=2, + ) + self.assertAlmostEqual( + res["abs_DERT"][1], 220.965, places=2, + ) + self.assertAlmostEqual( + res["rel_RT"][2], 12.491, places=2, + ) + self.assertAlmostEqual( + res["rel_DERT"][3], 7.373, places=2, + ) + + # Change n_firings + res = compute_thresholds( + emgfile=emgfile, + event_="rt_dert", + type_="abs_rel", + n_firings=5, + mvc=1234, + ) + + self.assertAlmostEqual( + res["abs_RT"][0], 170.833, places=2, + ) + self.assertAlmostEqual( + res["abs_DERT"][1], 244.561, places=2, + ) + self.assertAlmostEqual( + res["rel_RT"][2], 14.677, places=2, + ) + self.assertAlmostEqual( + res["rel_DERT"][3], 9.313, places=2, + ) + + # Change event_ + res = compute_thresholds( + emgfile=emgfile, + event_="rt", + type_="rel", + n_firings=5, + mvc=1234, + ) + + self.assertAlmostEqual( + res["rel_RT"][0], 13.843, places=2, + ) + + def test_compute_dr(self): + """ + Test the compute_dr function with the samplefile. + """ + + # Load the decomposed samplefile + emgfile = emg_from_samplefile() + + # Ramps duration + t_ramps = 10 * emgfile["FSAMP"] + + # Default parameters + res = compute_dr( + emgfile=emgfile, + n_firings_RecDerec=4, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertAlmostEqual( + res["DR_rec"][0], 3.341, places=2, + ) + self.assertAlmostEqual( + res["DR_derec"][1], 4.662, places=2, + ) + self.assertAlmostEqual( + res["DR_start_steady"][2], 8.793, places=2, + ) + self.assertAlmostEqual( + res["DR_end_steady"][3], 10.718, places=2, + ) + self.assertAlmostEqual( + res["DR_all_steady"][4], 10.693, places=2, + ) + self.assertAlmostEqual( + res["DR_all"][3], 10.693, places=2, + ) + + # Change n_firings_RecDerec + res = compute_dr( + emgfile=emgfile, + n_firings_RecDerec=10, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertAlmostEqual( + res["DR_rec"][0], 7.031, places=2, + ) + self.assertAlmostEqual( + res["DR_derec"][1], 5.596, places=2, + ) + + res = compute_dr( + emgfile=emgfile, + n_firings_RecDerec=1, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertTrue(np.isnan(res["DR_rec"][0])) + self.assertTrue(np.isnan(res["DR_derec"][0])) + + # Change n_firings_steady + res = compute_dr( + emgfile=emgfile, + n_firings_RecDerec=4, + n_firings_steady=36, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertAlmostEqual( + res["DR_start_steady"][0], 9.971, places=2, + ) + self.assertAlmostEqual( + res["DR_end_steady"][1], 6.806, places=2, + ) + + res = compute_dr( + emgfile=emgfile, + n_firings_RecDerec=1, + n_firings_steady=1, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertAlmostEqual( + res["DR_start_steady"][0], 7.474, places=2, + ) + self.assertAlmostEqual( + res["DR_end_steady"][1], 7.062, places=2, + ) + + # Change idr_range + res = compute_dr( + emgfile=emgfile, + n_firings_RecDerec=4, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + idr_range=[7, 10], + ) + + self.assertTrue(np.isnan(res["DR_end_steady"][0])) + self.assertAlmostEqual( + res["DR_all"][1], 7.644, places=2, + ) + + def test_basic_mus_properties(self): + """ + Test the basic_mus_properties function with the samplefile. + """ + + # Load the decomposed samplefile + emgfile = emg_from_samplefile() + + # Ramps duration + t_ramps = 10 * emgfile["FSAMP"] + + # Default parameters + res = basic_mus_properties( + emgfile=emgfile, + n_firings_rt_dert=1, + n_firings_RecDerec=4, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + accuracy="default", + ignore_negative_ipts=False, + constrain_pulses=[True, 3], + mvc=1234, + ) + + self.assertAlmostEqual( + res["MVC"][0], 1234.0, places=0, + ) + self.assertAlmostEqual( + res["Accuracy"][1], 0.955, places=2, + ) + self.assertAlmostEqual( + res["avg_Accuracy"][0], 0.914, places=2, + ) + self.assertAlmostEqual( + res["COV_steady"][0], 1.316, places=2, + ) + + # Change accuracy estimation + res = basic_mus_properties( + emgfile=emgfile, + n_firings_rt_dert=1, + n_firings_RecDerec=4, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + accuracy="SIL_PNR", + ignore_negative_ipts=True, + constrain_pulses=[True, 3], + mvc=1234, + ) + + self.assertAlmostEqual( + res["SIL"][1], 0.830, places=2, + ) + self.assertAlmostEqual( + res["avg_SIL"][0], 0.676, places=2, + ) + self.assertAlmostEqual( + res["PNR"][4], 28.469, places=2, + ) + self.assertAlmostEqual( + res["avg_PNR"][0], 29.113, places=2, + ) + + res = basic_mus_properties( + emgfile=emgfile, + n_firings_rt_dert=1, + n_firings_RecDerec=4, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + accuracy="PNR", + ignore_negative_ipts=True, + constrain_pulses=[False, 3], + mvc=1234, + ) + + self.assertAlmostEqual( + res["PNR"][4], 28.079, places=2, + ) + self.assertAlmostEqual( + res["avg_PNR"][0], 29.895, places=2, + ) + + # Change idr_range + res = basic_mus_properties( + emgfile=emgfile, + n_firings_rt_dert=1, + n_firings_RecDerec=4, + n_firings_steady=10, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + idr_range=[7, 10], + accuracy="default", + ignore_negative_ipts=False, + constrain_pulses=[True, 3], + mvc=1234, + ) + + self.assertTrue(np.isnan(res["DR_end_steady"][0])) + self.assertAlmostEqual( + res["DR_all"][1], 7.644, places=2, + ) + self.assertAlmostEqual( + res["COVisi_steady"][2], 7.677, places=2, + ) + + def test_compute_covisi(self): + """ + Test the compute_dr function with the samplefile. + """ + + # Load the decomposed samplefile + emgfile = emg_from_samplefile() + + # Ramps duration + t_ramps = 10 * emgfile["FSAMP"] + + # Default parameters + res = compute_covisi( + emgfile=emgfile, + n_firings_RecDerec=4, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + single_mu_number=-1, + ) + + self.assertAlmostEqual( + res["COVisi_rec"][0], 67.000, places=2, + ) + self.assertAlmostEqual( + res["COVisi_derec"][1], 24.007, places=2, + ) + self.assertAlmostEqual( + res["COVisi_steady"][2], 8.700, places=2, + ) + self.assertAlmostEqual( + res["COVisi_all"][3], 19.104, places=2, + ) + + # Change n_firings_RecDerec and event_ + res = compute_covisi( + emgfile=emgfile, + n_firings_RecDerec=1, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec", + single_mu_number=-1, + ) + + self.assertTrue(np.isnan(res["COVisi_rec"][0])) + self.assertTrue(np.isnan(res["COVisi_derec"][0])) + + # Change single_mu_number + res = compute_covisi( + emgfile=emgfile, + n_firings_RecDerec=4, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + single_mu_number=3, + ) + + self.assertAlmostEqual( + res["COVisi_all"][0], 19.104, places=2, + ) + + # Change idr_range + res = compute_covisi( + emgfile=emgfile, + n_firings_RecDerec=4, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + idr_range=[7, 10], + single_mu_number=-1, + ) + + self.assertTrue(np.isnan(res["COVisi_rec"][0])) + self.assertAlmostEqual( + res["COVisi_steady"][2], 7.677, places=2, + ) + + def test_compute_drvariability(self): + """ + Test the compute_drvariability function with the samplefile. + """ + + # Load the decomposed samplefile + emgfile = emg_from_samplefile() + + # Ramps duration + t_ramps = 10 * emgfile["FSAMP"] + + # Default parameters + res = compute_drvariability( + emgfile=emgfile, + n_firings_RecDerec=4, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertAlmostEqual( + res["DRvar_rec"][0], 109.254, places=2, + ) + self.assertAlmostEqual( + res["DRvar_derec"][1], 21.662, places=2, + ) + self.assertAlmostEqual( + res["DRvar_steady"][2], 8.840, places=2, + ) + self.assertAlmostEqual( + res["DRvar_all"][3], 12.803, places=2, + ) + + # Change n_firings_RecDerec + res = compute_drvariability( + emgfile=emgfile, + n_firings_RecDerec=1, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + ) + + self.assertTrue(np.isnan(res["DRvar_rec"][0])) + self.assertTrue(np.isnan(res["DRvar_derec"][1])) + + # Change idr_range + res = compute_drvariability( + emgfile=emgfile, + n_firings_RecDerec=4, + start_steady=0 + t_ramps, + end_steady=emgfile["EMG_LENGTH"] - t_ramps, + event_="rec_derec_steady", + idr_range=[7, 10], + ) + + self.assertTrue(np.isnan(res["DRvar_rec"][0])) + self.assertAlmostEqual( + res["DRvar_all"][1], 6.466, places=2, + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/openhdemg/tests/unit/test_pic.py b/openhdemg/tests/unit/test_pic.py new file mode 100644 index 0000000..736a5b5 --- /dev/null +++ b/openhdemg/tests/unit/test_pic.py @@ -0,0 +1,99 @@ +""" +To run the tests using unittest, execute from the openhdemg/tests directory: + python -m unittest discover + +First, you should dowload all the files necessary for the testing and store +them inside openhdemg/tests/fixtures. The files are available at: +https://drive.google.com/drive/folders/1suCZSils8rSCs2E3_K25vRCbN3AFDI7F?usp=sharing + +IMPORTANT: Do not alter the content of the dowloaded folder! + +WARNING!!! Since the library's functions perform complex tasks and return +complex data structures, these tests can verify that no critical errors occur, +but the accuracy of each function must be assessed independently upon creation, +or at each revision of the code. + +WARNING!!! - UNTESTED FUNCTIONS: none +""" + + +import unittest +from openhdemg.tests.unit.functions_for_unit_test import get_directories as getd + +from openhdemg.library.openfiles import emg_from_samplefile + +from openhdemg.library.tools import ( + compute_svr, +) + +from openhdemg.library.pic import ( + compute_deltaf, +) + +import numpy as np + + +class TestPIC(unittest.TestCase): + """ + Test the functions/classes in the pic module. + """ + + def test_compute_deltaf(self): + """ + Test the compute_deltaf function with the samplefile. + """ + + # Load the decomposed samplefile + emgfile = emg_from_samplefile() + + # Generate smooth discharge rate estimates + svrfits = compute_svr(emgfile) + + # Default parameters, test_unit_average + res = compute_deltaf( + emgfile, + smoothfits=svrfits["gensvr"], + average_method="test_unit_average", + normalisation="False", + recruitment_difference_cutoff=1.0, + corr_cutoff=0.7, + controlunitmodulation_cutoff=0.5, + clean=True, + ) + + expected_values = [np.nan, 2.709522, 1.838382, np.nan, np.nan] + + for ii in range(len(expected_values)): + if np.isnan(expected_values[ii]): + self.assertTrue(np.isnan(res["dF"][ii])) + else: + self.assertAlmostEqual( + res["dF"][ii], expected_values[ii], places=2, + ) + + # Compute for all MU pairs + res = compute_deltaf( + emgfile, + smoothfits=svrfits["gensvr"], + average_method="all", + normalisation="False", + recruitment_difference_cutoff=1.0, + corr_cutoff=0.7, + controlunitmodulation_cutoff=0.5, + clean=True, + ) + + expected_values = [np.nan, np.nan, np.nan, np.nan, 2.709522, np.nan, + np.nan, 2.127461, 1.549303, np.nan] + + for ii in range(len(expected_values)): + if np.isnan(expected_values[ii]): + self.assertTrue(np.isnan(res["dF"][ii])) + else: + self.assertAlmostEqual( + res["dF"][ii], expected_values[ii], places=2, + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/requirements.txt b/requirements.txt index 9a01b8d..5703586 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/setup.py b/setup.py index 853a955..105d373 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,12 @@ -# Copyright (C) 2023 Giacomo Valli, Paul Ritsche +# Copyright (C) 2022 - 2024. The openhdemg community. -# For the openhdemg version # To read the content of the README or description file from pathlib import Path # To install required dependencies from setuptools import setup -import openhdemg as emg +import openhdemg INSTALL_REQUIRES = [ "customtkinter==5.2.1", @@ -17,11 +16,10 @@ "openpyxl==3.1.2", "pandas==2.0.3", "pandastable==0.13.1", - "pyperclip==1.8.2", - # "pre-commit==3.7.0", "scipy==1.11.3", "seaborn==0.13.0", - "joblib==1.3.2", + "scikit-learn==1.5.0", + # "pre-commit==3.7.0", ] PACKAGES = [ @@ -69,7 +67,7 @@ "Source Code": "https://github.com/GiacomoValliPhD/openhdemg", "Bug Tracker": "https://github.com/GiacomoValliPhD/openhdemg/issues", }, - version=emg.__version__, + version=openhdemg.__version__, install_requires=INSTALL_REQUIRES, include_package_data=True, packages=PACKAGES,