Skip to content

Commit

Permalink
remove streamlit from all workflow classes except for StreamlitUI, re…
Browse files Browse the repository at this point in the history
…work Files to FileManager
  • Loading branch information
axelwalter committed Feb 15, 2024
1 parent 7c939da commit b6030e8
Show file tree
Hide file tree
Showing 8 changed files with 422 additions and 314 deletions.
8 changes: 4 additions & 4 deletions pages/5_TOPP-Workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

t = st.tabs(["📁 **File Upload**", "⚙️ **Configure**", "🚀 **Run**", "📊 **Results**"])
with t[0]:
wf.ui.show_file_upload_section()
wf.show_file_upload_section()

with t[1]:
wf.ui.show_parameter_section()
wf.show_parameter_section()

with t[2]:
wf.ui.show_execution_section()
wf.show_execution_section()

with t[3]:
wf.ui.show_results_section()
wf.show_results_section()

214 changes: 147 additions & 67 deletions pages/6_📖_TOPP-Workflow_Docs.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions src/Workflow.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import streamlit as st
from .workflow.WorkflowManager import WorkflowManager
from .workflow.Files import Files

class Workflow(WorkflowManager):
# Setup pages for upload, parameter, execution and results.
# For layout use any streamlit components such as tabs (as shown in example), columns, or even expanders.
def __init__(self) -> None:
# Initialize the parent class with the workflow name.
super().__init__("TOPP Workflow")
super().__init__("TOPP Workflow", st.session_state["workspace"])

def upload(self)-> None:
t = st.tabs(["MS data", "Example with fallback data"])
with t[0]:
# Use the upload method from StreamlitUI to handle mzML file uploads.
self.ui.upload(key="mzML-files", name="MS data", file_type="mzML")
self.ui.upload_widget(key="mzML-files", name="MS data", file_type="mzML")
with t[1]:
# Example with fallback data (not used in workflow)
self.ui.upload(key="image", file_type="png", fallback="assets/OpenMS.png")
self.ui.upload_widget(key="image", file_type="png", fallback="assets/OpenMS.png")

def configure(self) -> None:
# Allow users to select mzML files for the analysis.
Expand All @@ -43,14 +42,15 @@ def configure(self) -> None:
self.ui.input_python("example")

def execution(self) -> None:
# Wrap mzML files into a Files object for processing.
in_mzML = Files(self.params["mzML-files"], "mzML")
# Get mzML input files from self.params.
# Can be done without file manager, however, it ensures everything is correct.
in_mzML = self.file_manager.get_files(self.params["mzML-files"])

# Log any messages.
self.logger.log(f"Number of input mzML files: {len(in_mzML)}")

# Prepare output files for feature detection.
out_ffm = Files(in_mzML, "featureXML", "feature-detection")
out_ffm = self.file_manager.get_files(in_mzML, "featureXML", "feature-detection")

# Run FeatureFinderMetabo tool with input and output files.
self.executor.run_topp(
Expand All @@ -61,7 +61,7 @@ def execution(self) -> None:
if self.params["run-adduct-detection"]:

# Run MetaboliteAdductDecharger for adduct detection, with disabled logs.
# Without a new Files object for output, the input files will be overwritten in this case.
# Without a new file list for output, the input files will be overwritten in this case.
self.executor.run_topp(
"MetaboliteAdductDecharger", {"in": out_ffm, "out_fm": out_ffm}, write_log=False
)
Expand All @@ -70,10 +70,10 @@ def execution(self) -> None:
self.executor.run_python("example", {"in": in_mzML})

# Prepare output file for SiriusExport.
out_se = Files(["sirius-export.ms"], "ms", "sirius-export")

# Run SiriusExport tool with the collected files.
self.executor.run_topp("SiriusExport", {"in": in_mzML.collect(), "in_featureinfo": out_ffm.collect(), "out": out_se})
out_se = self.file_manager.get_files("sirius.ms", set_results_dir="sirius-export")
self.executor.run_topp("SiriusExport", {"in": self.file_manager.get_files(in_mzML, collect=True),
"in_featureinfo": self.file_manager.get_files(out_ffm, collect=True),
"out": out_se})

def results(self) -> None:
st.warning("Not implemented yet.")
15 changes: 10 additions & 5 deletions src/workflow/CommandExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import subprocess
import threading
from pathlib import Path
from .Files import Files
from .Logger import Logger
from .ParameterManager import ParameterManager
import sys
Expand Down Expand Up @@ -102,11 +101,11 @@ def run_command(self, command: list[str], write_log: bool = True) -> None:
execution_time = end_time - start_time

# Format the logging prefix
self.logger.log(f"Total time to run command: {execution_time:.2f} seconds")
self.logger.log(f"Process finished:\n"+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds")

# Log stdout if present
if stdout and write_log:
self.logger.log(f"Console log:\n\n{stdout.decode()}")
self.logger.log(stdout.decode())

# Log stderr and raise an exception if errors occurred
if stderr or process.returncode != 0:
Expand All @@ -119,7 +118,13 @@ def run_topp(self, tool: str, input_output: dict, write_log: bool = True) -> Non
Constructs and executes commands for the specified tool OpenMS TOPP tool based on the given
input and output configurations. Ensures that all input/output file lists
are of the same length, or single strings, to maintain consistency in command
execution. Supports executing commands either as single or multiple processes
execution.
In many tools, a single input file is processed to produce a single output file.
When dealing with lists of input or output files, the convention is that
files are paired based on their order. For instance, the n-th input file is
assumed to correspond to the n-th output file, maintaining a structured
relationship between input and output data.
Supports executing commands either as single or multiple processes
based on the input size.
Args:
Expand Down Expand Up @@ -238,7 +243,7 @@ def run_python(self, script_file: str, input_output: dict = {}, write_log: bool
for k, v in params.items():
defaults[k.replace(f"{path.name}:", "")] = v
for k, v in input_output.items():
defaults[k] = v.files if isinstance(v, Files) else v
defaults[k] = v
# save parameters to temporary JSON file
tmp_params_files = Path(self.pid_dir.parent, f"{path.stem}.json")
with open(tmp_params_files, "w", encoding="utf-8") as f:
Expand Down
186 changes: 186 additions & 0 deletions src/workflow/FileManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
from pathlib import Path
import string
import random
import shutil
from typing import Union, List
from .Logger import Logger


class FileManager:
"""
Manages file paths for operations such as changing file extensions, organizing files
into result directories, and handling file collections for processing tools. Designed
to be flexible for handling both individual files and lists of files, with integration
into a Streamlit workflow.
Methods:
get_files: Returns a list of file paths as strings for the specified files, optionally with new file type and results subdirectory.
collect: Collects all files in a single list (e.g. to pass to tools which can handle multiple input files at once).
"""

def __init__(
self,
workflow_dir: Path,
):
"""
Initializes the FileManager object with a the current workflow results directory.
"""
self.workflow_dir = workflow_dir

def get_files(
self,
files: Union[List[Union[str, Path]], Path, str, List[List[str]]],
set_file_type: str = None,
set_results_dir: str = None,
collect: bool = False,
) -> Union[List[str], List[List[str]]]:
"""
Returns a list of file paths as strings for the specified files.
Otionally sets or changes the file extension for all files to the
specified file type and changes the directory to a new subdirectory
in the workflow results directory.
Args:
files (Union[List[Union[str, Path]], Path, str, List[List[str]]]): The list of file
paths to change the type for.
set_file_type (str): The file extension to set for all files.
set_results_dir (str): The name of a subdirectory in the workflow
results directory to change to. If "auto" or "" a random name will be generated.
collect (bool): Whether to collect all files into a single list. Will return a list
with a single entry, which is a list of all files. Useful to pass to tools which
can handle multiple input files at once.
Returns:
Union[List[str], List[List[str]]]: The (modified) files list.
"""
# Handle input single string
if isinstance(files, str):
files = [files]
# Handle input single Path object, can be directory or file
elif isinstance(files, Path):
if files.is_dir():
files = [str(f) for f in files.iterdir()]
else:
files = [str(files)]
# Handle input list
elif isinstance(files, list):
# Can have one entry of strings (e.g. if has been collected before by FileManager)
if isinstance(files[0], list):
files = files[0]
# Make sure ever file path is a string
files = [str(f) for f in files if isinstance(f, Path) or isinstance(f, str)]
# Set new file type if required
if set_file_type is not None:
files = self._set_type(files, set_file_type)
# Set new results subdirectory if required
if set_results_dir is not None:
if set_results_dir == "auto":
set_results_dir = ""
files = self._set_dir(files, set_results_dir)
# Collect files into a single list if required
if collect:
files = [files]
# Raise error if no files have been detected
if not files:
raise ValueError(
f"No files found, can not set file type **{set_file_type}**, results_dir **{set_results_dir}** and collect **{collect}**."
)
return files

def _set_type(self, files: List[str], set_file_type: str) -> List[str]:
"""
Sets or changes the file extension for all files in the collection to the
specified file type.
Args:
files (List[str]): The list of file paths to change the type for.
set_file_type (str): The file extension to set for all files.
Returns:
List[str]: The files list with new type.
"""

def change_extension(file_path, new_ext):
return Path(file_path).with_suffix("." + new_ext)

for i in range(len(files)):
if isinstance(files[i], list): # If the item is a list
files[i] = [
str(change_extension(file, set_file_type)) for file in files[i]
]
elif isinstance(files[i], str): # If the item is a string
files[i] = str(change_extension(files[i], set_file_type))
return files

def _set_dir(self, files: List[str], subdir_name: str) -> List[str]:
"""
Sets the subdirectory within the results directory to store files. If the
subdirectory name is 'auto' or empty, generates a random subdirectory name.
Warns and overwrites if the subdirectory already exists.
Args:
files (List[str]): The list of file paths to change the type for.
subdir_name (str): The name of the subdirectory within the results directory.
Returns:
List[str]: The files list with new directory.
"""
if not subdir_name:
subdir_name = self._create_results_sub_dir(subdir_name)
else:
if Path(self.workflow_dir, "results", subdir_name).exists():
Logger().log(
f"WARNING: Subdirectory already exists, will overwrite content: {subdir_name}"
)
subdir_name = self._create_results_sub_dir(subdir_name)

def change_subdir(file_path, subdir):
return Path(subdir, Path(file_path).name)

for i in range(len(files)):
if isinstance(files[i], list): # If the item is a list
files[i] = [str(change_subdir(file, subdir_name)) for file in files[i]]
elif isinstance(files[i], str): # If the item is a string
files[i] = str(change_subdir(files[i], subdir_name))
return files

def _generate_random_code(self, length: int) -> str:
"""Generate a random code of the specified length.
Args:
length (int): Length of the random code.
Returns:
str: Random code of the specified length.
"""
# Define the characters that can be used in the code
# Includes both letters and numbers
characters = string.ascii_letters + string.digits

# Generate a random code of the specified length
random_code = "".join(random.choice(characters) for _ in range(length))

return random_code

def _create_results_sub_dir(self, name: str = "") -> str:
"""
Creates a subdirectory within the results directory for storing files. If the
name is not specified or empty, generates a random name for the subdirectory.
Args:
name (str, optional): The desired name for the subdirectory.
Returns:
str: The path to the created subdirectory as a string.
"""
# create a directory (e.g. for results of a TOPP tool) within the results directory
# if name is empty string, auto generate a name
if not name:
name = self._generate_random_code(4)
# make sure the subdirectory does not exist in results yet
while Path(self.workflow_dir, "results", name).exists():
name = self._generate_random_code(4)
path = Path(self.workflow_dir, "results", name)
shutil.rmtree(path, ignore_errors=True)
path.mkdir()
return str(path)
Loading

0 comments on commit b6030e8

Please sign in to comment.