diff --git a/pages/5_TOPP-Workflow.py b/pages/5_TOPP-Workflow.py index 8ef94be..0f5f53c 100644 --- a/pages/5_TOPP-Workflow.py +++ b/pages/5_TOPP-Workflow.py @@ -13,14 +13,14 @@ t = st.tabs(["📁 **File Upload**", "⚙️ **Configure**", "🚀 **Run**", "📊 **Results**"]) with t[0]: - wf.ui.show_file_upload_section() + wf.show_file_upload_section() with t[1]: - wf.ui.show_parameter_section() + wf.show_parameter_section() with t[2]: - wf.ui.show_execution_section() + wf.show_execution_section() with t[3]: - wf.ui.show_results_section() + wf.show_results_section() diff --git "a/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" "b/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" index 3d6ce30..e3405f2 100644 --- "a/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" +++ "b/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" @@ -1,7 +1,7 @@ import streamlit as st from src.Workflow import Workflow from src.workflow.StreamlitUI import StreamlitUI -from src.workflow.Files import Files +from src.workflow.FileManager import FileManager from src.workflow.CommandExecutor import CommandExecutor from src.common import page_setup from inspect import getsource @@ -30,16 +30,16 @@ with st.expander("**Example User Interface**", True): t = st.tabs(["📁 **File Upload**", "⚙️ **Configure**", "🚀 **Run**", "📊 **Results**"]) with t[0]: - wf.ui.show_file_upload_section() + wf.show_file_upload_section() with t[1]: - wf.ui.show_parameter_section() + wf.show_parameter_section() with t[2]: - wf.ui.show_execution_section() + wf.show_execution_section() with t[3]: - wf.ui.show_results_section() + wf.show_results_section() st.markdown( """ @@ -47,7 +47,7 @@ This repository contains a module in `src/workflow` that provides a framework for building and running analysis workflows. -The `WorkflowManager` class provides the core workflow logic. It uses the `Logger`, `Files`, `DirectoryManager`, `ParameterManager`, and `CommandExecutor` classes to setup a complete workflow logic. +The `WorkflowManager` class provides the core workflow logic. It uses the `Logger`, `FileManager`, `ParameterManager`, and `CommandExecutor` classes to setup a complete workflow logic. To build your own workflow edit the file `src/TOPPWorkflow.py`. Use any streamlit components such as tabs (as shown in example), columns, or even expanders to organize the helper functions for displaying file upload and parameter widgets. @@ -65,15 +65,12 @@ > **`self.logger`:** object of type `Logger` to write any output to a log file during workflow execution. -Handling input and output files in the `Workflow.execution` method for processes is done with the `Files` class, handling file types and creation of output directories. +> **`self.file_manager`:** object of type `FileManager` to handle file types and creation of output directories. """ ) with st.expander("**Complete example for custom Workflow class**", expanded=False): - st.code( -getsource(Workflow) - ) - + st.code(getsource(Workflow)) st.markdown( """ @@ -81,7 +78,7 @@ All input files for the workflow will be stored within the workflow directory in the subdirectory `input-files` within it's own subdirectory for the file type. -The subdirectory name will be determined by a **key** that is defined in the `self.ui.upload` method. The uploaded files are available by the specific key for parameter input widgets and accessible while building the workflow. +The subdirectory name will be determined by a **key** that is defined in the `self.ui.upload_widget` method. The uploaded files are available by the specific key for parameter input widgets and accessible while building the workflow. Calling this method will create a complete file upload widget section with the following components: @@ -92,13 +89,12 @@ Fallback files(s) can be specified, which will be used if the user doesn't upload any files. This can be useful for example for database files where a default is provided. """) -st.code( -getsource(Workflow.upload) -) +st.code(getsource(Workflow.upload)) + st.info("💡 Use the same **key** for parameter widgets, to select which of the uploaded files to use for analysis.") with st.expander("**Code documentation:**", expanded=True): - st.help(StreamlitUI.upload) + st.help(StreamlitUI.upload_widget) st.markdown( """ @@ -155,39 +151,44 @@ """ ## Building the Workflow -Building the workflow involves **calling all (TOPP) tools** using **`self.executor`** with **input and output files** based on the **`Files`** class. For TOPP tools non-input-output parameters are handled automatically. Parameters for other processes and workflow logic can be accessed via widget keys (set in the parameter section) in the **`self.params`** dictionary. +Building the workflow involves **calling all (TOPP) tools** using **`self.executor`** with **input and output files** based on the **`FileManager`** class. For TOPP tools non-input-output parameters are handled automatically. Parameters for other processes and workflow logic can be accessed via widget keys (set in the parameter section) in the **`self.params`** dictionary. -### Files +### FileManager -The `Files` class serves as an interface for unified input and output files with useful functionality specific to building workflows, such as **setting a (new) file type** and **subdirectory in the workflows result directory**. +The `FileManager` class serves as an interface for unified input and output files with useful functionality specific to building workflows, such as **setting a (new) file type** and **subdirectory in the workflows result directory**. -The `Files` object contains all file paths in the collection as strings. It can be initialized with a list of file paths or with a single file path. The file path can be either of type `str` or `pathlib.Path`. +Use the **`get_files`** method to get a list of all file paths as strings. -All file paths in a `Files` object can be collected in a list to be passed to a (TOPP) tool which can handle multiple input files (see **Running commands/Run TOPP tools** section for examples). +Optionally set the following parameters modify the files: + +- **set_file_type** (str): set new file types and result subdirectory. +- **set_results_dir** (str): set a new subdirectory in the workflows result directory. +- **collect** (bool): collect all files into a single list. Will return a list with a single entry, which is a list of all files. Useful to pass to tools which can handle multiple input files at once. """) st.code( """ -# Creating a File object for input mzML files. -mzML_files = Files(self.params["mzML-files]) +# Get all file paths as strings from self.param entry. +mzML_files = self.file_manager.get_files(self.params["mzML-files]) # mzML_files = ['../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML', '../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML'] # Creating output files for a TOPP tool, setting a new file type and result subdirectory name. -feature_detection_out = Files(mzML_files, set_file_type="featureXML", set_results_dir="feature-detection") +feature_detection_out = self.file_manager.get_files(mzML_files, set_file_type="featureXML", set_results_dir="feature-detection") # feature_detection_out = ['../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML', '../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML'] # Setting a name for the output directory automatically (useful if you never plan to access these files in the results section). -feature_detection_out = Files(mzML_files, set_file_type="featureXML", set_results_dir="auto") +feature_detection_out = self.file_manager.get_files(mzML_files, set_file_type="featureXML", set_results_dir="auto") # feature_detection_out = ['../workspaces-streamlit-template/default/topp-workflow/results/6DUd/Control.featureXML', '../workspaces-streamlit-template/default/topp-workflow/results/6DUd/Treatment.featureXML'] -# Combining all mzML files to be passed to a TOPP tool in a single run. Does not change the Files object. -# mzML_files.collect() = [['../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML', '../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML']] +# Combining all mzML files to be passed to a TOPP tool in a single run. Using "collected" files as argument for self.file_manager.get_files will "un-collect" them. +mzML_files = self.file_manager.get_files(mzML_files, collect=True) +# mzML_files = [['../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML', '../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML']] """ ) with st.expander("**Code documentation**", expanded=True): - st.help(Files.__init__) - st.help(Files.collect) + st.help(FileManager.get_files) + st.markdown( """ ### Running commands @@ -211,15 +212,23 @@ **3. Run TOPP tools** -The `self.executor.run_topp` method takes a TOPP tool name as input and a dictionary of input and output files as input. The **keys** need to match the actual input and output parameter names of the TOPP tool. The **values** should be of type `Files`. All other **non-default parameters (from input widgets)** will be passed to the TOPP tool automatically. +The `self.executor.run_topp` method takes a TOPP tool name as input and a dictionary of input and output files as input. The **keys** need to match the actual input and output parameter names of the TOPP tool. The **values** should be of type `FileManager`. All other **non-default parameters (from input widgets)** will be passed to the TOPP tool automatically. -Depending on the number of input files, the TOPP tool will be run either in parallel or in a single run (using **`Files.collect`**). +Depending on the number of input files, the TOPP tool will be run either in parallel or in a single run (using **`FileManager.collect`**). """) +st.info("""💡 **Input and output file order** + +In many tools, a single input file is processed to produce a single output file. +When dealing with lists of input or output files, the convention is that +files are paired based on their order. For instance, the n-th input file is +assumed to correspond to the n-th output file, maintaining a structured +relationship between input and output data. +""") st.code(""" # e.g. FeatureFinderMetabo takes single input files -in_files = Files(["sample1.mzML", "sample2.mzML"]) -out_files = Files(in_files, set_file_type="featureXML", set_results_dir="feature-detection") +in_files = self.file_manager.get_files(["sample1.mzML", "sample2.mzML"]) +out_files = self.file_manager.get_files(in_files, set_file_type="featureXML", set_results_dir="feature-detection") # Run FeatureFinderMetabo tool with input and output files in parallel for each pair of input/output files. self.executor.run_topp("FeatureFinderMetabo", input_output={"in": in_files, "out": out_files}) @@ -227,8 +236,10 @@ # FeaturFinderMetabo -in sample2.mzML -out workspace-dir/results/feature-detection/sample2.featureXML # Run SiriusExport tool with mutliple input and output files. -out = Files("sirius.ms") -self.executor.run_topp("SiriusExport", {"in": in_files.collect(), "in_featureinfo": out_files.collect(), "out": out_se}) +out = self.file_manager.get_files("sirius.ms", set_results_dir="sirius-export") +self.executor.run_topp("SiriusExport", {"in": self.file_manager.get_files(in_files, collect=True), + "in_featureinfo": self.file_manager.get_files(out_files, collect=True), + "out": out_se}) # SiriusExport -in sample1.mzML sample2.mzML -in_featureinfo sample1.featureXML sample2.featureXML -out sirius.ms """) @@ -242,7 +253,7 @@ st.code(""" # e.g. example Python tool which modifies mzML files in place based on experimental design -self.ui.input_python(script_file="example", input_output={"in": in_mzML, "in_experimantal_design": Files(["path/to/experimantal-design.tsv"])}) +self.ui.input_python(script_file="example", input_output={"in": in_mzML, "in_experimantal_design": FileManager(["path/to/experimantal-design.tsv"])}) """) st.markdown("**Example for a complete workflow section:**") @@ -251,69 +262,138 @@ getsource(Workflow.execution) ) -with st.expander("**Example output (truncated) of the workflow code above**"): +with st.expander("**Code documentation**", expanded=True): + st.help(CommandExecutor.run_command) + st.help(CommandExecutor.run_multiple_commands) + st.help(CommandExecutor.run_topp) + st.help(CommandExecutor.run_python) + +with st.expander("**Example output of the complete example workflow**"): st.code(""" -Starting workflow... +STARTING WORKFLOW Number of input mzML files: 2 Running 2 commands in parallel... Running command: -FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -algorithm:common:noise_threshold_int 1000.0 +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 Waiting for command to finish... Running command: -FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -algorithm:common:noise_threshold_int 1000.0 +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 Waiting for command to finish... -Total time to run command: 0.56 seconds +Process finished: +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 +Total time to run command: 0.55 seconds -Console log: +Progress of 'loading mzML': + Progress of 'loading spectra list': -# FeatureFinderMetabo output (removed for this docs example) + 89.06 % + -- done [took 0.17 s (CPU), 0.17 s (Wall)] -- + Progress of 'loading chromatogram list': -Total time to run command: 0.59 seconds + -- done [took 0.00 s (CPU), 0.00 s (Wall)] -- -Console log: +-- done [took 0.18 s (CPU), 0.18 s (Wall) @ 40.66 MiB/s] -- +Progress of 'mass trace detection': -# FeatureFinderMetabo output (removed for this docs example) +-- done [took 0.01 s (CPU), 0.01 s (Wall)] -- +Progress of 'elution peak detection': -Total time to run 2 commands: 0.59 seconds +-- done [took 0.07 s (CPU), 0.07 s (Wall)] -- +Progress of 'assembling mass traces to features': +Loading metabolite isotope model with 5% RMS error -Running 2 commands in parallel... +-- done [took 0.04 s (CPU), 0.04 s (Wall)] -- +-- FF-Metabo stats -- +Input traces: 1382 +Output features: 1095 (total trace count: 1382) +FeatureFinderMetabo took 0.47 s (wall), 0.90 s (CPU), 0.43 s (system), 0.47 s (user); Peak Memory Usage: 88 MB. -Running command: -MetaboliteAdductDecharger -in ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -out_fm ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -Waiting for command to finish... -Running command: -MetaboliteAdductDecharger -in ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -out_fm ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -Waiting for command to finish... +Process finished: +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 +Total time to run command: 0.60 seconds + +Progress of 'loading mzML': + Progress of 'loading spectra list': + + 77.09 % + -- done [took 0.16 s (CPU), 0.16 s (Wall)] -- + Progress of 'loading chromatogram list': + + -- done [took 0.00 s (CPU), 0.00 s (Wall)] -- + +-- done [took 0.17 s (CPU), 0.17 s (Wall) @ 43.38 MiB/s] -- +Progress of 'mass trace detection': + +-- done [took 0.02 s (CPU), 0.02 s (Wall)] -- +Progress of 'elution peak detection': -Total time to run command: 12.22 seconds +-- done [took 0.07 s (CPU), 0.07 s (Wall)] -- +Progress of 'assembling mass traces to features': +Loading metabolite isotope model with 5% RMS error -Total time to run command: 15.80 seconds +-- done [took 0.05 s (CPU), 0.05 s (Wall)] -- +-- FF-Metabo stats -- +Input traces: 1521 +Output features: 1203 (total trace count: 1521) +FeatureFinderMetabo took 0.51 s (wall), 0.90 s (CPU), 0.45 s (system), 0.45 s (user); Peak Memory Usage: 88 MB. -Total time to run 2 commands: 15.80 seconds + +Total time to run 2 commands: 0.60 seconds Running command: -SiriusExport -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -in_featureinfo ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -out ../workspaces-streamlit-template/default/topp-workflow/results/sirius-export/sirius-export.ms +python src/python-tools/example.py ../workspaces-streamlit-template/default/topp-workflow/example.json Waiting for command to finish... -Total time to run command: 0.67 seconds +Process finished: +python src/python-tools/example.py ../workspaces-streamlit-template/default/topp-workflow/example.json +Total time to run command: 0.04 seconds + +Writing stdout which will get logged... +Parameters for this example Python tool: +{ + "in": [ + "../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML", + "../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML" + ], + "out": [], + "number-slider": 6, + "selectbox-example": "c", + "adavanced-input": 5, + "checkbox": true +} -Console log: -# SiriusExport output (removed for this docs example) +Running command: +SiriusExport -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -in_featureinfo ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -out ../workspaces-streamlit-template/default/topp-workflow/results/sirius-export/sirius.ms +Waiting for command to finish... -COMPLETE +Process finished: +SiriusExport -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -in_featureinfo ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -out ../workspaces-streamlit-template/default/topp-workflow/results/sirius-export/sirius.ms +Total time to run command: 0.65 seconds + +Number of features to be processed: 0 +Number of additional MS2 spectra to be processed: 0 +No MS1 spectrum for this precursor. Occurred 0 times. +0 spectra were skipped due to precursor charge below -1 and above +1. +Mono charge assumed and set to charge 1 with respect to current polarity 0 times. +0 features were skipped due to feature charge below -1 and above +1. +No MS1 spectrum for this precursor. Occurred 0 times. +0 spectra were skipped due to precursor charge below -1 and above +1. +Mono charge assumed and set to charge 1 with respect to current polarity 0 times. +0 features were skipped due to feature charge below -1 and above +1. + occurred 2 times +SiriusExport took 0.61 s (wall), 1.71 s (CPU), 1.06 s (system), 0.65 s (user); Peak Memory Usage: 88 MB. + occurred 2 times + + +WORKFLOW FINISHED """, language="neon") -with st.expander("**Code documentation**", expanded=True): - st.help(CommandExecutor.run_command) - st.help(CommandExecutor.run_multiple_commands) - st.help(CommandExecutor.run_topp) - st.help(CommandExecutor.run_python) diff --git a/src/Workflow.py b/src/Workflow.py index b6c035c..0cc3ccd 100644 --- a/src/Workflow.py +++ b/src/Workflow.py @@ -1,22 +1,21 @@ import streamlit as st from .workflow.WorkflowManager import WorkflowManager -from .workflow.Files import Files class Workflow(WorkflowManager): # Setup pages for upload, parameter, execution and results. # For layout use any streamlit components such as tabs (as shown in example), columns, or even expanders. def __init__(self) -> None: # Initialize the parent class with the workflow name. - super().__init__("TOPP Workflow") + super().__init__("TOPP Workflow", st.session_state["workspace"]) def upload(self)-> None: t = st.tabs(["MS data", "Example with fallback data"]) with t[0]: # Use the upload method from StreamlitUI to handle mzML file uploads. - self.ui.upload(key="mzML-files", name="MS data", file_type="mzML") + self.ui.upload_widget(key="mzML-files", name="MS data", file_type="mzML") with t[1]: # Example with fallback data (not used in workflow) - self.ui.upload(key="image", file_type="png", fallback="assets/OpenMS.png") + self.ui.upload_widget(key="image", file_type="png", fallback="assets/OpenMS.png") def configure(self) -> None: # Allow users to select mzML files for the analysis. @@ -43,14 +42,15 @@ def configure(self) -> None: self.ui.input_python("example") def execution(self) -> None: - # Wrap mzML files into a Files object for processing. - in_mzML = Files(self.params["mzML-files"], "mzML") + # Get mzML input files from self.params. + # Can be done without file manager, however, it ensures everything is correct. + in_mzML = self.file_manager.get_files(self.params["mzML-files"]) # Log any messages. self.logger.log(f"Number of input mzML files: {len(in_mzML)}") # Prepare output files for feature detection. - out_ffm = Files(in_mzML, "featureXML", "feature-detection") + out_ffm = self.file_manager.get_files(in_mzML, "featureXML", "feature-detection") # Run FeatureFinderMetabo tool with input and output files. self.executor.run_topp( @@ -61,7 +61,7 @@ def execution(self) -> None: if self.params["run-adduct-detection"]: # Run MetaboliteAdductDecharger for adduct detection, with disabled logs. - # Without a new Files object for output, the input files will be overwritten in this case. + # Without a new file list for output, the input files will be overwritten in this case. self.executor.run_topp( "MetaboliteAdductDecharger", {"in": out_ffm, "out_fm": out_ffm}, write_log=False ) @@ -70,10 +70,10 @@ def execution(self) -> None: self.executor.run_python("example", {"in": in_mzML}) # Prepare output file for SiriusExport. - out_se = Files(["sirius-export.ms"], "ms", "sirius-export") - - # Run SiriusExport tool with the collected files. - self.executor.run_topp("SiriusExport", {"in": in_mzML.collect(), "in_featureinfo": out_ffm.collect(), "out": out_se}) + out_se = self.file_manager.get_files("sirius.ms", set_results_dir="sirius-export") + self.executor.run_topp("SiriusExport", {"in": self.file_manager.get_files(in_mzML, collect=True), + "in_featureinfo": self.file_manager.get_files(out_ffm, collect=True), + "out": out_se}) def results(self) -> None: st.warning("Not implemented yet.") \ No newline at end of file diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 767300a..734091b 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -4,7 +4,6 @@ import subprocess import threading from pathlib import Path -from .Files import Files from .Logger import Logger from .ParameterManager import ParameterManager import sys @@ -102,11 +101,11 @@ def run_command(self, command: list[str], write_log: bool = True) -> None: execution_time = end_time - start_time # Format the logging prefix - self.logger.log(f"Total time to run command: {execution_time:.2f} seconds") + self.logger.log(f"Process finished:\n"+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds") # Log stdout if present if stdout and write_log: - self.logger.log(f"Console log:\n\n{stdout.decode()}") + self.logger.log(stdout.decode()) # Log stderr and raise an exception if errors occurred if stderr or process.returncode != 0: @@ -119,7 +118,13 @@ def run_topp(self, tool: str, input_output: dict, write_log: bool = True) -> Non Constructs and executes commands for the specified tool OpenMS TOPP tool based on the given input and output configurations. Ensures that all input/output file lists are of the same length, or single strings, to maintain consistency in command - execution. Supports executing commands either as single or multiple processes + execution. + In many tools, a single input file is processed to produce a single output file. + When dealing with lists of input or output files, the convention is that + files are paired based on their order. For instance, the n-th input file is + assumed to correspond to the n-th output file, maintaining a structured + relationship between input and output data. + Supports executing commands either as single or multiple processes based on the input size. Args: @@ -238,7 +243,7 @@ def run_python(self, script_file: str, input_output: dict = {}, write_log: bool for k, v in params.items(): defaults[k.replace(f"{path.name}:", "")] = v for k, v in input_output.items(): - defaults[k] = v.files if isinstance(v, Files) else v + defaults[k] = v # save parameters to temporary JSON file tmp_params_files = Path(self.pid_dir.parent, f"{path.stem}.json") with open(tmp_params_files, "w", encoding="utf-8") as f: diff --git a/src/workflow/FileManager.py b/src/workflow/FileManager.py new file mode 100644 index 0000000..c0b6415 --- /dev/null +++ b/src/workflow/FileManager.py @@ -0,0 +1,186 @@ +from pathlib import Path +import string +import random +import shutil +from typing import Union, List +from .Logger import Logger + + +class FileManager: + """ + Manages file paths for operations such as changing file extensions, organizing files + into result directories, and handling file collections for processing tools. Designed + to be flexible for handling both individual files and lists of files, with integration + into a Streamlit workflow. + + Methods: + get_files: Returns a list of file paths as strings for the specified files, optionally with new file type and results subdirectory. + collect: Collects all files in a single list (e.g. to pass to tools which can handle multiple input files at once). + """ + + def __init__( + self, + workflow_dir: Path, + ): + """ + Initializes the FileManager object with a the current workflow results directory. + """ + self.workflow_dir = workflow_dir + + def get_files( + self, + files: Union[List[Union[str, Path]], Path, str, List[List[str]]], + set_file_type: str = None, + set_results_dir: str = None, + collect: bool = False, + ) -> Union[List[str], List[List[str]]]: + """ + Returns a list of file paths as strings for the specified files. + Otionally sets or changes the file extension for all files to the + specified file type and changes the directory to a new subdirectory + in the workflow results directory. + + Args: + files (Union[List[Union[str, Path]], Path, str, List[List[str]]]): The list of file + paths to change the type for. + set_file_type (str): The file extension to set for all files. + set_results_dir (str): The name of a subdirectory in the workflow + results directory to change to. If "auto" or "" a random name will be generated. + collect (bool): Whether to collect all files into a single list. Will return a list + with a single entry, which is a list of all files. Useful to pass to tools which + can handle multiple input files at once. + + Returns: + Union[List[str], List[List[str]]]: The (modified) files list. + """ + # Handle input single string + if isinstance(files, str): + files = [files] + # Handle input single Path object, can be directory or file + elif isinstance(files, Path): + if files.is_dir(): + files = [str(f) for f in files.iterdir()] + else: + files = [str(files)] + # Handle input list + elif isinstance(files, list): + # Can have one entry of strings (e.g. if has been collected before by FileManager) + if isinstance(files[0], list): + files = files[0] + # Make sure ever file path is a string + files = [str(f) for f in files if isinstance(f, Path) or isinstance(f, str)] + # Set new file type if required + if set_file_type is not None: + files = self._set_type(files, set_file_type) + # Set new results subdirectory if required + if set_results_dir is not None: + if set_results_dir == "auto": + set_results_dir = "" + files = self._set_dir(files, set_results_dir) + # Collect files into a single list if required + if collect: + files = [files] + # Raise error if no files have been detected + if not files: + raise ValueError( + f"No files found, can not set file type **{set_file_type}**, results_dir **{set_results_dir}** and collect **{collect}**." + ) + return files + + def _set_type(self, files: List[str], set_file_type: str) -> List[str]: + """ + Sets or changes the file extension for all files in the collection to the + specified file type. + + Args: + files (List[str]): The list of file paths to change the type for. + set_file_type (str): The file extension to set for all files. + + Returns: + List[str]: The files list with new type. + """ + + def change_extension(file_path, new_ext): + return Path(file_path).with_suffix("." + new_ext) + + for i in range(len(files)): + if isinstance(files[i], list): # If the item is a list + files[i] = [ + str(change_extension(file, set_file_type)) for file in files[i] + ] + elif isinstance(files[i], str): # If the item is a string + files[i] = str(change_extension(files[i], set_file_type)) + return files + + def _set_dir(self, files: List[str], subdir_name: str) -> List[str]: + """ + Sets the subdirectory within the results directory to store files. If the + subdirectory name is 'auto' or empty, generates a random subdirectory name. + Warns and overwrites if the subdirectory already exists. + + Args: + files (List[str]): The list of file paths to change the type for. + subdir_name (str): The name of the subdirectory within the results directory. + + Returns: + List[str]: The files list with new directory. + """ + if not subdir_name: + subdir_name = self._create_results_sub_dir(subdir_name) + else: + if Path(self.workflow_dir, "results", subdir_name).exists(): + Logger().log( + f"WARNING: Subdirectory already exists, will overwrite content: {subdir_name}" + ) + subdir_name = self._create_results_sub_dir(subdir_name) + + def change_subdir(file_path, subdir): + return Path(subdir, Path(file_path).name) + + for i in range(len(files)): + if isinstance(files[i], list): # If the item is a list + files[i] = [str(change_subdir(file, subdir_name)) for file in files[i]] + elif isinstance(files[i], str): # If the item is a string + files[i] = str(change_subdir(files[i], subdir_name)) + return files + + def _generate_random_code(self, length: int) -> str: + """Generate a random code of the specified length. + + Args: + length (int): Length of the random code. + + Returns: + str: Random code of the specified length. + """ + # Define the characters that can be used in the code + # Includes both letters and numbers + characters = string.ascii_letters + string.digits + + # Generate a random code of the specified length + random_code = "".join(random.choice(characters) for _ in range(length)) + + return random_code + + def _create_results_sub_dir(self, name: str = "") -> str: + """ + Creates a subdirectory within the results directory for storing files. If the + name is not specified or empty, generates a random name for the subdirectory. + + Args: + name (str, optional): The desired name for the subdirectory. + + Returns: + str: The path to the created subdirectory as a string. + """ + # create a directory (e.g. for results of a TOPP tool) within the results directory + # if name is empty string, auto generate a name + if not name: + name = self._generate_random_code(4) + # make sure the subdirectory does not exist in results yet + while Path(self.workflow_dir, "results", name).exists(): + name = self._generate_random_code(4) + path = Path(self.workflow_dir, "results", name) + shutil.rmtree(path, ignore_errors=True) + path.mkdir() + return str(path) diff --git a/src/workflow/Files.py b/src/workflow/Files.py deleted file mode 100644 index fbc205b..0000000 --- a/src/workflow/Files.py +++ /dev/null @@ -1,170 +0,0 @@ -from pathlib import Path -import string -import random -import shutil -import streamlit as st -from typing import Union, List -from .Logger import Logger - - -class Files: - """ - Manages file paths for operations such as changing file extensions, organizing files - into result directories, and handling file collections for processing tools. Designed - to be flexible for handling both individual files and lists of files, with integration - into a Streamlit workflow. - - Attributes: - files (List[str]): A list of file paths, initialized from various input formats. - - Methods: - collect: Collects all files in a single list (e.g. to pass to tools which can handle multiple input files at once). - """ - def __init__( - self, - files: Union[List[Union[str, Path]], Path, "Files"], - set_file_type: str = None, - set_results_dir: str = None, - ): - """ - Initializes the Files object with a collection of file paths, optional file type, - and results directory. Converts various input formats (single path, list of paths, - Files object) into a unified list of file paths. - - Args: - files (Union[List[Union[Union[str, Path]], Path, "Files"]): The initial collection - of file paths or a Files object. - set_file_type (str, optional): Set the file type/extension for the files. - _set_dir (str, optional): Set the directory to store processed results (creates a sub-directory of workflow results directory). If set to "auto", a name will be auto-generated. - """ - if isinstance(files, str): - self.files = [files] - elif isinstance(files, Files): - self.files = files.files.copy() - elif isinstance(files, Path): - if files.is_dir(): - self.files = [str(f) for f in files.iterdir()] - else: - self.files = [str(files)] - elif isinstance(files, list): - self.files = [str(f) for f in files if isinstance(f, Path) or isinstance(f, str)] - if set_file_type is not None: - self._set_type(set_file_type) - if set_results_dir is not None: - if set_results_dir == "auto": - set_results_dir = "" - self._set_dir(set_results_dir) - if not self.files: - raise ValueError(f"No files found with type {set_file_type}") - - def _set_type(self, set_file_type: str) -> None: - """ - Sets or changes the file extension for all files in the collection to the - specified file type. - - Args: - set_file_type (str): The file extension to set for all files. - """ - def change_extension(file_path, new_ext): - return Path(file_path).with_suffix("." + new_ext) - - for i in range(len(self.files)): - if isinstance(self.files[i], list): # If the item is a list - self.files[i] = [ - str(change_extension(file, set_file_type)) for file in self.files[i] - ] - elif isinstance(self.files[i], str): # If the item is a string - self.files[i] = str(change_extension(self.files[i], set_file_type)) - - def _set_dir(self, subdir_name: str) -> None: - """ - Sets the subdirectory within the results directory to store files. If the - subdirectory name is 'auto' or empty, generates a random subdirectory name. - Warns and overwrites if the subdirectory already exists. - - Args: - subdir_name (str): The name of the subdirectory within the results directory. - """ - if not subdir_name: - subdir_name = self._create_results_sub_dir(subdir_name) - else: - if Path(st.session_state["workflow-dir"], "results", subdir_name).exists(): - Logger().log( - f"WARNING: Subdirectory already exists, will overwrite content: {subdir_name}" - ) - subdir_name = self._create_results_sub_dir(subdir_name) - - def change_subdir(file_path, subdir): - return Path(subdir, Path(file_path).name) - - for i in range(len(self.files)): - if isinstance(self.files[i], list): # If the item is a list - self.files[i] = [ - str(change_subdir(file, subdir_name)) for file in self.files[i] - ] - elif isinstance(self.files[i], str): # If the item is a string - self.files[i] = str(change_subdir(self.files[i], subdir_name)) - - def _generate_random_code(self, length: int) -> int: - """Generate a random code of the specified length. - - Args: - length (int): Length of the random code. - - Returns: - int: Random code of the specified length. - """ - # Define the characters that can be used in the code - # Includes both letters and numbers - characters = string.ascii_letters + string.digits - - # Generate a random code of the specified length - random_code = "".join(random.choice(characters) for _ in range(length)) - - return random_code - - def _create_results_sub_dir(self, name: str = "") -> str: - """ - Creates a subdirectory within the results directory for storing files. If the - name is not specified or empty, generates a random name for the subdirectory. - - Args: - name (str, optional): The desired name for the subdirectory. - - Returns: - str: The path to the created subdirectory as a string. - """ - # create a directory (e.g. for results of a TOPP tool) within the results directory - # if name is empty string, auto generate a name - if not name: - name = self._generate_random_code(4) - # make sure the subdirectory does not exist in results yet - while Path(st.session_state["workflow-dir"], "results", name).exists(): - name = self._generate_random_code(4) - path = Path(st.session_state["workflow-dir"], "results", name) - shutil.rmtree(path, ignore_errors=True) - path.mkdir() - return str(path) - - def collect(self): - """ - Combines all files in the files list into a single list (e.g. to pass to tools which can handle multiple input files at once). - - Does not change the file collection. - - Returns: - List[List[str]]: The combined files list. - """ - return [self.files] - - def __repr__(self): - return self.files - - def __str__(self): - return str(self.files) - - def __len__(self): - return len(self.files) - - def __getitem__(self, index): - return self.files[index] diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 0f10a99..3cbde8b 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -3,7 +3,6 @@ from pathlib import Path import shutil import subprocess -from .Files import Files from typing import Any, Union, List import json import sys @@ -12,26 +11,23 @@ from io import BytesIO import zipfile - class StreamlitUI: """ Provides an interface for Streamlit applications to handle file uploads, input selection, and parameter management for analysis workflows. It includes methods for uploading files, selecting input files from available ones, and generating various input widgets dynamically based on the specified parameters. - - The class is designed to work with pyOpenMS for mass spectrometry data analysis, - leveraging the ParameterManager for parameter persistence and the Files class - for file management. """ # Methods for Streamlit UI components - def __init__(self, workflow_manager): - self.workflow_manager = workflow_manager - self.workflow_dir = workflow_manager.workflow_dir - self.params = self.workflow_manager.parameter_manager.get_parameters_from_json() - - def upload( + def __init__(self, workflow_dir, logger, executor, paramter_manager): + self.workflow_dir = workflow_dir + self.logger = logger + self.executor = executor + self.parameter_manager = paramter_manager + self.params = self.parameter_manager.get_parameters_from_json() + + def upload_widget( self, key: str, file_type: str, @@ -139,7 +135,7 @@ def upload( ): shutil.rmtree(files_dir) del self.params[key] - with open(self.workflow_manager.parameter_manager.params_file, "w", encoding="utf-8") as f: + with open(self.parameter_manager.params_file, "w", encoding="utf-8") as f: json.dump(self.params, f, indent=4) st.rerun() elif not fallback: @@ -168,7 +164,7 @@ def select_input_file( if not path.exists(): st.warning(f"No **{name}** files!") return - options = Files([f for f in path.iterdir()]) + options = [str(f) for f in path.iterdir()] if key in self.params.keys(): self.params[key] = [f for f in self.params[key] if f in options] @@ -188,7 +184,7 @@ def input_widget( name: str = "input widget", help: str = None, widget_type: str = "auto", # text, textarea, number, selectbox, slider, checkbox, multiselect - options: Union[List[str], "Files"] = None, + options: List[str] = None, min_value: Union[int, float] = None, max_value: Union[int, float] = None, step_size: Union[int, float] = 1, @@ -209,28 +205,19 @@ def input_widget( widget_type (str, optional): Type of widget to create ('text', 'textarea', 'number', 'selectbox', 'slider', 'checkbox', 'multiselect', 'password', or 'auto'). - options (Union[List[str], "Files"], optional): Options for select/multiselect widgets. + options (List[str], optional): Options for select/multiselect widgets. min_value (Union[int, float], optional): Minimum value for number/slider widgets. max_value (Union[int, float], optional): Maximum value for number/slider widgets. step_size (Union[int, float], optional): Step size for number/slider widgets. display_file_path (bool, optional): Whether to display the full file path for file options. """ - def convert_files_to_str(input: Any) -> List[str]: - if isinstance(input, Files): - return input.files - else: - return input - def format_files(input: Any) -> List[str]: if not display_file_path and Path(input).exists(): return Path(input).name else: return input - default = convert_files_to_str(default) - options = convert_files_to_str(options) - if key in self.params.keys(): value = self.params[key] else: @@ -242,7 +229,7 @@ def format_files(input: Any) -> List[str]: elif widget_type == "selectbox": value = options[0] - key = f"{self.workflow_manager.parameter_manager.param_prefix}{key}" + key = f"{self.parameter_manager.param_prefix}{key}" if widget_type == "text": st.text_input(name, value=value, key=key, help=help) @@ -382,7 +369,7 @@ def input_TOPP( exclude_parameters (List[str], optional): List of parameter names to exclude from the widget. """ # write defaults ini files - ini_file_path = Path(self.workflow_manager.parameter_manager.ini_dir, f"{topp_tool_name}.ini") + ini_file_path = Path(self.parameter_manager.ini_dir, f"{topp_tool_name}.ini") if not ini_file_path.exists(): subprocess.call([topp_tool_name, "-write_ini", str(ini_file_path)]) # read into Param object @@ -437,7 +424,7 @@ def input_TOPP( if not st.session_state["advanced"] and p["advanced"]: continue - key = f"{self.workflow_manager.parameter_manager.topp_param_prefix}{p['key'].decode()}" + key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" try: # bools @@ -641,13 +628,12 @@ def zip_and_download_files(self, directory: str): ) - def show_file_upload_section(self) -> None: - self.workflow_manager.upload() + def file_upload_section(self, custom_upload_function) -> None: + custom_upload_function() if st.button("⬇️ Download all uploaded files", use_container_width=True): self.ui.zip_and_download_files(Path(self.workflow_dir, "input-files")) - def show_parameter_section(self) -> None: - # c1.title(f"⚙️ Parameters") + def parameter_section(self, custom_paramter_function) -> None: st.toggle("Show advanced parameters", value=False, key="advanced") form = st.form( @@ -660,7 +646,7 @@ def show_parameter_section(self) -> None: cols[0].form_submit_button( label="Save parameters", - on_click=self.workflow_manager.parameter_manager.save_parameters, + on_click=self.parameter_manager.save_parameters, type="primary", use_container_width=True, ) @@ -668,37 +654,36 @@ def show_parameter_section(self) -> None: if cols[1].form_submit_button( label="Load default parameters", use_container_width=True ): - self.workflow_manager.parameter_manager.reset_to_default_parameters() + self.parameter_manager.reset_to_default_parameters() - # Load parameters - self.workflow_manager.configure() + custom_paramter_function() # Save parameters - self.workflow_manager.parameter_manager.save_parameters() + self.parameter_manager.save_parameters() - def show_execution_section(self) -> None: - if self.workflow_manager.executor.pid_dir.exists(): + def execution_section(self, start_workflow_function) -> None: + if self.executor.pid_dir.exists(): if st.button("Stop Workflow", type="primary", use_container_width=True): - self.workflow_manager.executor.stop() + self.executor.stop() st.rerun() else: st.button( "Start Workflow", type="primary", use_container_width=True, - on_click=self.workflow_manager.start_workflow, + on_click=start_workflow_function, ) - if self.workflow_manager.logger.log_file.exists(): - if self.workflow_manager.executor.pid_dir.exists(): + if self.logger.log_file.exists(): + if self.executor.pid_dir.exists(): with st.spinner("**Workflow running...**"): - with open(self.workflow_manager.logger.log_file, "r", encoding="utf-8") as f: + with open(self.logger.log_file, "r", encoding="utf-8") as f: st.code(f.read(), language="neon", line_numbers=True) time.sleep(2) st.rerun() else: st.markdown("**Workflow log file**") - with open(self.workflow_manager.logger.log_file, "r", encoding="utf-8") as f: + with open(self.logger.log_file, "r", encoding="utf-8") as f: st.code(f.read(), language="neon", line_numbers=True) - def show_results_section(self) -> None: - self.workflow_manager.results() \ No newline at end of file + def results_section(self, custom_results_function) -> None: + custom_results_function() \ No newline at end of file diff --git a/src/workflow/WorkflowManager.py b/src/workflow/WorkflowManager.py index 51cc9d1..3f70097 100644 --- a/src/workflow/WorkflowManager.py +++ b/src/workflow/WorkflowManager.py @@ -3,22 +3,21 @@ from .ParameterManager import ParameterManager from .CommandExecutor import CommandExecutor from .StreamlitUI import StreamlitUI +from .FileManager import FileManager import multiprocessing import shutil -import streamlit as st class WorkflowManager: # Core workflow logic using the above classes - def __init__(self, name: str = "Workflow Base"): + def __init__(self, name: str, workspace: str): self.name = name - self.workflow_dir = Path(st.session_state["workspace"], self.name.replace(" ", "-").lower()) - st.session_state["workflow-dir"] = str(self.workflow_dir) - self.parameter_manager = ParameterManager(self.workflow_dir) + self.workflow_dir = Path(workspace, name.replace(" ", "-").lower()) + self.file_manager = FileManager(self.workflow_dir) self.logger = Logger(self.workflow_dir) + self.parameter_manager = ParameterManager(self.workflow_dir) self.executor = CommandExecutor(self.workflow_dir, self.logger, self.parameter_manager) - self.ui = StreamlitUI(self) self.params = self.parameter_manager.get_parameters_from_json() - + self.ui = StreamlitUI(self.workflow_dir, self.logger, self.executor, self.parameter_manager) def start_workflow(self) -> None: """ @@ -39,18 +38,41 @@ def workflow_process(self) -> None: Workflow process. Logs start and end of the workflow and calls the execution method where all steps are defined. """ try: - self.logger.log("Starting workflow...") + self.logger.log("STARTING WORKFLOW") results_dir = Path(self.workflow_dir, "results") if results_dir.exists(): shutil.rmtree(results_dir) results_dir.mkdir(parents=True) self.execution() - self.logger.log("COMPLETE") + self.logger.log("WORKFLOW FINISHED") except Exception as e: self.logger.log(f"ERROR: {e}") # Delete pid dir path to indicate workflow is done shutil.rmtree(self.executor.pid_dir, ignore_errors=True) + def show_file_upload_section(self) -> None: + """ + Shows the file upload section of the UI with content defined in self.upload(). + """ + self.ui.file_upload_section(self.upload) + + def show_parameter_section(self) -> None: + """ + Shows the parameter section of the UI with content defined in self.configure(). + """ + self.ui.parameter_section(self.configure) + + def show_execution_section(self) -> None: + """ + Shows the execution section of the UI with content defined in self.execution(). + """ + self.ui.execution_section(self.start_workflow) + + def show_results_section(self) -> None: + """ + Shows the results section of the UI with content defined in self.results(). + """ + self.ui.results_section(self.results) def upload(self) -> None: """