diff --git a/content/file_upload.py b/content/file_upload.py index ec5892c..0fc0e04 100755 --- a/content/file_upload.py +++ b/content/file_upload.py @@ -3,7 +3,7 @@ import streamlit as st import pandas as pd -from src.common import page_setup, save_params, v_space, show_table +from src.common import page_setup, save_params, v_space, show_table, TK_AVAILABLE, tk_directory_dialog from src import fileupload params = page_setup() @@ -38,21 +38,45 @@ # Local file upload option: via directory path if st.session_state.location == "local": with tabs[2]: - # with st.form("local-file-upload"): - local_mzML_dir = st.text_input("path to folder with mzML files") + st_cols = st.columns([0.05, 0.95], gap="small") + with st_cols[0]: + st.write("\n") + st.write("\n") + dialog_button = st.button("📁", key='local_browse', help="Browse for your local directory with MS data.", disabled=not TK_AVAILABLE) + if dialog_button: + st.session_state["local_dir"] = tk_directory_dialog("Select directory with your MS data", st.session_state["previous_dir"]) + st.session_state["previous_dir"] = st.session_state["local_dir"] + with st_cols[1]: + # with st.form("local-file-upload"): + local_mzML_dir = st.text_input("path to folder with mzML files", value=st.session_state["local_dir"]) # raw string for file paths local_mzML_dir = rf"{local_mzML_dir}" - cols = st.columns(3) - if cols[1].button( - "Copy files to workspace", type="primary", disabled=(local_mzML_dir == "") - ): - fileupload.copy_local_mzML_files_from_directory(local_mzML_dir) + cols = st.columns([0.65, 0.3, 0.4, 0.25], gap="small") + copy_button = cols[1].button("Copy files to workspace", type="primary", disabled=(local_mzML_dir == "")) + use_copy = cols[2].checkbox("Make a copy of files", key="local_browse-copy_files", value=True, help="Create a copy of files in workspace.") + if not use_copy: + st.warning( + "**Warning**: You have deselected the `Make a copy of files` option. " + "This **_assumes you know what you are doing_**. " + "This means that the original files will be used instead. " + ) + if copy_button: + fileupload.copy_local_mzML_files_from_directory(local_mzML_dir, use_copy) mzML_dir = Path(st.session_state.workspace, "mzML-files") if any(Path(mzML_dir).iterdir()): v_space(2) # Display all mzML files currently in workspace - df = pd.DataFrame({"file name": [f.name for f in Path(mzML_dir).iterdir()]}) + df = pd.DataFrame({"file name": [f.name for f in Path(mzML_dir).iterdir() if "external_files.txt" not in f.name]}) + + # Check if local files are available + external_files = Path(mzML_dir, "external_files.txt") + if external_files.exists(): + with open(external_files, "r") as f_handle: + external_files = f_handle.readlines() + external_files = [f.strip() for f in external_files] + df = pd.concat([df, pd.DataFrame({"file name": external_files})], ignore_index=True) + st.markdown("##### mzML files in current workspace:") show_table(df) v_space(1) diff --git a/content/raw_data_viewer.py b/content/raw_data_viewer.py index fad276d..4a001d4 100755 --- a/content/raw_data_viewer.py +++ b/content/raw_data_viewer.py @@ -12,9 +12,21 @@ # File selection can not be in fragment since it influences the subsequent sections cols = st.columns(3) + +mzML_dir = Path(st.session_state.workspace, "mzML-files") +file_options = [f.name for f in mzML_dir.iterdir() if "external_files.txt" not in f.name] + +# Check if local files are available +external_files = Path(mzML_dir, "external_files.txt") +if external_files.exists(): + with open(external_files, "r") as f_handle: + external_files = f_handle.readlines() + external_files = [f.strip() for f in external_files] + file_options += external_files + selected_file = cols[0].selectbox( "choose file", - [f.name for f in Path(st.session_state.workspace, "mzML-files").iterdir()], + file_options, key="view_selected_file" ) if selected_file: diff --git a/content/run_example_workflow.py b/content/run_example_workflow.py index 4e72230..3b2f4c7 100755 --- a/content/run_example_workflow.py +++ b/content/run_example_workflow.py @@ -20,9 +20,20 @@ with st.form("workflow-with-mzML-form"): st.markdown("**Parameters**") + + file_options = [f.stem for f in Path(st.session_state.workspace, "mzML-files").glob("*.mzML") if "external_files.txt" not in f.name] + + # Check if local files are available + external_files = Path(Path(st.session_state.workspace, "mzML-files"), "external_files.txt") + if external_files.exists(): + with open(external_files, "r") as f_handle: + external_files = f_handle.readlines() + external_files = [str(Path(f.strip()).with_suffix('')) for f in external_files] + file_options += external_files + st.multiselect( "**input mzML files**", - [f.stem for f in Path(st.session_state.workspace, "mzML-files").glob("*.mzML")], + file_options, params["example-workflow-selected-mzML-files"], key="example-workflow-selected-mzML-files", ) diff --git a/src/Workflow.py b/src/Workflow.py index 3e5864f..0300a24 100644 --- a/src/Workflow.py +++ b/src/Workflow.py @@ -22,7 +22,7 @@ def upload(self) -> None: self.ui.upload_widget( key="mzML-files", name="MS data", - file_type="mzML", + file_types="mzML", fallback=[str(f) for f in Path("example-data", "mzML").glob("*.mzML")], ) diff --git a/src/common.py b/src/common.py index 7bb4ba6..58ccb43 100644 --- a/src/common.py +++ b/src/common.py @@ -9,12 +9,21 @@ import streamlit as st import pandas as pd +try: + from tkinter import Tk, filedialog + TK_AVAILABLE = True +except ImportError: + TK_AVAILABLE = False + from .captcha_ import captcha_control # set these variables according to your project APP_NAME = "OpenMS Streamlit App" REPOSITORY_NAME = "streamlit-template" +# Detect system platform +OS_PLATFORM = sys.platform + def load_params(default: bool = False) -> dict[str, Any]: """ @@ -111,6 +120,8 @@ def page_setup(page: str = "") -> dict[str, Any]: # Check location if "local" in sys.argv: st.session_state.location = "local" + st.session_state["previous_dir"] = os.getcwd() + st.session_state["local_dir"] = "" else: st.session_state.location = "online" # if we run the packaged windows version, we start within the Python directory -> need to change working directory to ..\streamlit-template @@ -353,6 +364,52 @@ def reset_directory(path: Path) -> None: shutil.rmtree(path) path.mkdir(parents=True, exist_ok=True) +def tk_directory_dialog(title: str = "Select Directory", parent_dir: str = os.getcwd()): + """ + Creates a Tkinter directory dialog for selecting a directory. + + Args: + title (str): The title of the directory dialog. + parent_dir (str): The path to the parent directory of the directory dialog. + + Returns: + str: The path to the selected directory. + + Warning: + This function is not avaliable in a streamlit cloud context. + """ + root = Tk() + root.attributes("-topmost", True) + root.withdraw() + file_path = filedialog.askdirectory(title=title, initialdir=parent_dir) + root.destroy() + return file_path + +def tk_file_dialog(title: str = "Select File", file_types: list[tuple] = [], parent_dir: str = os.getcwd(), multiple:bool = True): + """ + Creates a Tkinter file dialog for selecting a file. + + Args: + title (str): The title of the file dialog. + file_types (list(tuple)): The file types to filter the file dialog. + parent_dir (str): The path to the parent directory of the file dialog. + multiple (bool): If True, multiple files can be selected. + + Returns: + str: The path to the selected file. + + Warning: + This function is not avaliable in a streamlit cloud context. + """ + root = Tk() + root.attributes("-topmost", True) + root.withdraw() + file_types.extend([("All files", "*.*")]) + file_path = filedialog.askopenfilename( + title=title, filetypes=file_types, initialdir=parent_dir, multiple=True + ) + root.destroy() + return file_path # General warning/error messages WARNINGS = { @@ -364,3 +421,4 @@ def reset_directory(path: Path) -> None: "workflow": "Something went wrong during workflow execution.", "visualization": "Something went wrong during visualization of results.", } + diff --git a/src/fileupload.py b/src/fileupload.py index cbf5248..719fe4f 100644 --- a/src/fileupload.py +++ b/src/fileupload.py @@ -35,12 +35,13 @@ def save_uploaded_mzML(uploaded_files: list[bytes]) -> None: st.success("Successfully added uploaded files!") -def copy_local_mzML_files_from_directory(local_mzML_directory: str) -> None: +def copy_local_mzML_files_from_directory(local_mzML_directory: str, make_copy: bool=True) -> None: """ Copies local mzML files from a specified directory to the mzML directory. Args: local_mzML_directory (str): Path to the directory containing the mzML files. + make_copy (bool): Whether to make a copy of the files in the workspace. Default is True. If False, local file paths will be written to an external_files.txt file. Returns: None @@ -53,7 +54,18 @@ def copy_local_mzML_files_from_directory(local_mzML_directory: str) -> None: # Copy all mzML files to workspace mzML directory, add to selected files files = Path(local_mzML_directory).glob("*.mzML") for f in files: - shutil.copy(f, Path(mzML_dir, f.name)) + if make_copy: + shutil.copy(f, Path(mzML_dir, f.name)) + else: + # Create a temporary file to store the path to the local directories + external_files = Path(mzML_dir, "external_files.txt") + # Check if the file exists, if not create it + if not external_files.exists(): + external_files.touch() + # Write the path to the local directories to the file + with open(external_files, "a") as f_handle: + f_handle.write(f"{f}\n") + st.success("Successfully added local files!") diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 50b3c0b..157b83b 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -5,6 +5,7 @@ import subprocess from typing import Any, Union, List import json +import os import sys import importlib.util import time @@ -12,6 +13,9 @@ import zipfile from datetime import datetime + +from src.common import OS_PLATFORM, TK_AVAILABLE, tk_directory_dialog, tk_file_dialog + class StreamlitUI: """ Provides an interface for Streamlit applications to handle file uploads, @@ -31,9 +35,9 @@ def __init__(self, workflow_dir, logger, executor, paramter_manager): def upload_widget( self, key: str, - file_type: str, + file_types: Union[str, List[str]], name: str = "", - fallback: Union[List, str] = None, + fallback: Union[List, str] = None ) -> None: """ Handles file uploads through the Streamlit interface, supporting both direct @@ -42,17 +46,19 @@ def upload_widget( Args: key (str): A unique identifier for the upload component. - file_type (str): Expected file type for the uploaded files. + file_types (Union[str, List[str]]): Expected file type(s) for the uploaded files. name (str, optional): Display name for the upload component. Defaults to the key if not provided. fallback (Union[List, str], optional): Default files to use if no files are uploaded. """ files_dir = Path(self.workflow_dir, "input-files", key) - + # create the files dir files_dir.mkdir(exist_ok=True, parents=True) # check if only fallback files are in files_dir, if yes, reset the directory before adding new files - if [Path(f).name for f in Path(files_dir).iterdir()] == [Path(f).name for f in fallback]: + if [Path(f).name for f in Path(files_dir).iterdir()] == [ + Path(f).name for f in fallback + ]: shutil.rmtree(files_dir) files_dir.mkdir() @@ -61,57 +67,152 @@ def upload_widget( c1, c2 = st.columns(2) c1.markdown("**Upload file(s)**") - with c1.form(f"{key}-upload", clear_on_submit=True): - if any(c.isupper() for c in file_type) and (c.islower() for c in file_type): - file_type_for_uploader = None - else: - file_type_for_uploader = [file_type] - files = st.file_uploader( - f"{name}", - accept_multiple_files=(st.session_state.location == "local"), - type=file_type_for_uploader, - label_visibility="collapsed", - ) - if st.form_submit_button( - f"Add **{name}**", use_container_width=True, type="primary" - ): - if files: - # in case of online mode a single file is returned -> put in list - if not isinstance(files, list): - files = [files] - for f in files: - if f.name not in [ - f.name for f in files_dir.iterdir() - ] and f.name.endswith(file_type): - with open(Path(files_dir, f.name), "wb") as fh: - fh.write(f.getbuffer()) - st.success("Successfully added uploaded files!") - else: - st.error("Nothing to add, please upload file.") - # Local file upload option: via directory path if st.session_state.location == "local": - c2.markdown("**OR copy files from local folder**") - with c2.form(f"{key}-local-file-upload"): - local_dir = st.text_input(f"path to folder with **{name}** files") + c2_text, c2_checkbox = c2.columns([1.5, 1], gap="large") + c2_text.markdown("**OR add files from local folder**") + use_copy = c2_checkbox.checkbox("Make a copy of files", key=f"{key}-copy_files", value=True, help="Create a copy of files in workspace.") + else: + use_copy = True + + # Convert file_types to a list if it's a string + if isinstance(file_types, str): + file_types = [file_types] + + if use_copy: + with c1.form(f"{key}-upload", clear_on_submit=True): + # Streamlit file uploader accepts file types as a list or None + file_type_for_uploader = file_types if file_types else None + + files = st.file_uploader( + f"{name}", + accept_multiple_files=(st.session_state.location == "local"), + type=file_type_for_uploader, + label_visibility="collapsed", + ) if st.form_submit_button( - f"Copy **{name}** files from local folder", use_container_width=True + f"Add **{name}**", use_container_width=True, type="primary" ): - # raw string for file paths - if not any(Path(local_dir).glob(f"*.{file_type}")): + if files: + # in case of online mode a single file is returned -> put in list + if not isinstance(files, list): + files = [files] + for f in files: + # Check if file type is in the list of accepted file types + if f.name not in [f.name for f in files_dir.iterdir()] and any( + f.name.endswith(ft) for ft in file_types + ): + with open(Path(files_dir, f.name), "wb") as fh: + fh.write(f.getbuffer()) + st.success("Successfully added uploaded files!") + else: + st.error("Nothing to add, please upload file.") + else: + # Create a temporary file to store the path to the local directories + external_files = Path(files_dir, "external_files.txt") + # Check if the file exists, if not create it + if not external_files.exists(): + external_files.touch() + c1.write("\n") + with c1.container(border=True): + dialog_button = st.button( + rf"$\textsf{{\Large 📁 Add }} \textsf{{ \Large \textbf{{{name}}} }}$", + type="primary", + use_container_width=True, + key="local_browse_single", + help="Browse for your local MS data files.", + disabled=not TK_AVAILABLE, + ) + + # Tk file dialog requires file types to be a list of tuples + if isinstance(file_types, str): + tk_file_types = [(f"{file_types}", f"*.{file_types}")] + elif isinstance(file_types, list): + tk_file_types = [(f"{ft}", f"*.{ft}") for ft in file_types] + else: + raise ValueError("'file_types' must be either of type str or list") + + + if dialog_button: + local_files = tk_file_dialog( + "Select your local MS data files", + tk_file_types, + st.session_state["previous_dir"], + ) + if local_files: + my_bar = st.progress(0) + for i, f in enumerate(local_files): + with open(external_files, "a") as f_handle: + f_handle.write(f"{f}\n") + my_bar.empty() + st.success("Successfully added files!") + + st.session_state["previous_dir"] = Path(local_files[0]).parent + + # Local file upload option: via directory path + if st.session_state.location == "local": + # c2_text, c2_checkbox = c2.columns([1.5, 1], gap="large") + # c2_text.markdown("**OR add files from local folder**") + # use_copy = c2_checkbox.checkbox("Make a copy of files", key=f"{key}-copy_files", value=True, help="Create a copy of files in workspace.") + with c2.container(border=True): + st_cols = st.columns([0.05, 0.55], gap="small") + with st_cols[0]: + st.write("\n") + st.write("\n") + dialog_button = st.button("📁", key='local_browse', help="Browse for your local directory with MS data.", disabled=not TK_AVAILABLE) + if dialog_button: + st.session_state["local_dir"] = tk_directory_dialog("Select directory with your MS data", st.session_state["previous_dir"]) + st.session_state["previous_dir"] = st.session_state["local_dir"] + + with st_cols[1]: + local_dir = st.text_input(f"path to folder with **{name}** files", value=st.session_state["local_dir"]) + + if c2.button(f"Add **{name}** files from local folder", use_container_width=True): + files = [] + local_dir = Path( + local_dir + ).expanduser() # Expand ~ to full home directory path + + for ft in file_types: + # Search for both files and directories with the specified extension + for path in local_dir.iterdir(): + if path.is_file() and path.name.endswith(f".{ft}"): + files.append(path) + elif path.is_dir() and path.name.endswith(f".{ft}"): + files.append(path) + + if not files: st.warning( - f"No files with type **{file_type}** found in specified folder." + f"No files with type **{', '.join(file_types)}** found in specified folder." ) else: - # Copy all mzML files to workspace mzML directory, add to selected files - files = list(Path(local_dir).glob("*.mzML")) my_bar = st.progress(0) for i, f in enumerate(files): my_bar.progress((i + 1) / len(files)) - shutil.copy(f, Path(files_dir, f.name)) + if use_copy: + if os.path.isfile(f): + shutil.copy(f, Path(files_dir, f.name)) + elif os.path.isdir(f): + shutil.copytree(f, Path(files_dir, f.name), dirs_exist_ok=True) + else: + # Write the path to the local directories to the file + with open(external_files, "a") as f_handle: + f_handle.write(f"{f}\n") my_bar.empty() st.success("Successfully copied files!") + if not TK_AVAILABLE: + c2.warning("**Warning**: Failed to import tkinter, either it is not installed, or this is being called from a cloud context. " "This function is not available in a Streamlit Cloud context. " + "You will have to manually enter the path to the folder with the MS files." + ) + + if not use_copy: + c2.warning( + "**Warning**: You have deselected the `Make a copy of files` option. " + "This **_assumes you know what you are doing_**. " + "This means that the original files will be used instead. " + ) + if fallback and not any(Path(files_dir).iterdir()): if isinstance(fallback, str): fallback = [fallback] @@ -127,7 +228,16 @@ def upload_widget( ] else: if files_dir.exists(): - current_files = [f.name for f in files_dir.iterdir()] + current_files = [f.name for f in files_dir.iterdir() if "external_files.txt" not in f.name] + + # Check if local files are available + external_files = Path(self.workflow_dir, "input-files", key, "external_files.txt") + + if external_files.exists(): + with open(external_files, "r") as f: + external_files_list = f.read().splitlines() + # Only make files available that still exist + current_files += [f"(local) {Path(f).name}" for f in external_files_list if os.path.exists(f)] else: current_files = [] @@ -144,7 +254,9 @@ def upload_widget( ): shutil.rmtree(files_dir) del self.params[key] - with open(self.parameter_manager.params_file, "w", encoding="utf-8") as f: + with open( + self.parameter_manager.params_file, "w", encoding="utf-8" + ) as f: json.dump(self.params, f, indent=4) st.rerun() elif not fallback: @@ -173,7 +285,16 @@ def select_input_file( if not path.exists(): st.warning(f"No **{name}** files!") return - options = [str(f) for f in path.iterdir()] + options = [str(f) for f in path.iterdir() if "external_files.txt" not in str(f)] + + # Check if local files are available + external_files = Path(self.workflow_dir, "input-files", key, "external_files.txt") + + if external_files.exists(): + with open(external_files, "r") as f: + external_files_list = f.read().splitlines() + # Only make files available that still exist + options += [f for f in external_files_list if os.path.exists(f)] if (key in self.params.keys()) and isinstance(self.params[key], list): self.params[key] = [f for f in self.params[key] if f in options] @@ -398,7 +519,6 @@ def input_TOPP( if encoded_key in param.keys(): param.setValue(encoded_key, value) poms.ParamXMLFile().store(str(ini_file_path), param) - # read into Param object param = poms.Param() @@ -431,7 +551,7 @@ def input_TOPP( "section_description": param.getSectionDescription(':'.join(key.decode().split(':')[:-1])) } params_decoded.append(tmp) - + # for each parameter in params_decoded # if a parameter with custom default value exists, use that value # else check if the parameter is already in self.params, if yes take the value from self.params @@ -449,7 +569,7 @@ def input_TOPP( section_description = None cols = st.columns(num_cols) i = 0 - + for p in params_decoded: # skip avdanced parameters if not selected if not st.session_state["advanced"] and p["advanced"]: @@ -457,11 +577,11 @@ def input_TOPP( key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" if display_subsections: - name = p["name"] - if section_description is None: + name = p["name"] + if section_description is None: section_description = p['section_description'] - - elif section_description != p['section_description']: + + elif section_description != p['section_description']: section_description = p['section_description'] st.markdown(f"**{section_description}**") cols = st.columns(num_cols) @@ -638,7 +758,7 @@ def zip_and_download_files(self, directory: str): Args: directory (str): The directory whose files are to be zipped. """ - # Ensure directory is a Path object and check if directory is empty + # Ensure directory is a Path object and check if directory is empty directory = Path(directory) if not any(directory.iterdir()): st.error("No files to compress.") @@ -675,8 +795,7 @@ def zip_and_download_files(self, directory: str): mime="application/zip", use_container_width=True ) - - + def file_upload_section(self, custom_upload_function) -> None: custom_upload_function() c1, _ = st.columns(2)