diff --git a/CHANGELOG.md b/CHANGELOG.md index 18f4ba8..8d5be93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,22 @@ ## RENEE development version -- Minor documentation improvements. (#132, #135, @kelly-sovacool) +### New features + - Support hg38 release 45 on biowulf & FRCE. (#127, @kelly-sovacool) -- Show the name of the pipeline rather than the python script for CLI help messages. (#131, @kelly-sovacool) -- Add GUI instructions to the documentation website. (#38, @samarth8392) +- Set default shared singularity SIF directory for biowulf and frce. (#94, @kelly-sovacool) +- Add `renee gui` subcommand to launch the graphical user interface. (#94, @kelly-sovacool) + - Previously, `renee_gui` (with an underscore) was a command in the `ccbrpipeliner` module. + +### Bug fixes + - Ensure `renee build` creates necessary `config` directory during initialization. (#139, @kelly-sovacool) + +### Documentation updates + +- Minor documentation improvements. (#132, #135, @kelly-sovacool) +- Add GUI instructions to the documentation website. (#38, @samarth8392) - The docs website now has a dropdown menu to select which version to view. The latest release is shown by default. (#150, @kelly-sovacool) +- Show the name of the pipeline rather than the python script for CLI help messages. (#131, @kelly-sovacool) ## RENEE 2.5.12 diff --git a/bin/redirect b/bin/redirect index 824d007..99a1086 100755 --- a/bin/redirect +++ b/bin/redirect @@ -67,4 +67,4 @@ elif [[ $ISFRCE == true ]];then export PATH="/mnt/projects/CCBR-Pipelines/bin:$PATH" fi -${TOOLDIR}/src/renee/__main__.py "$@" || true +${TOOLDIR}/main.py "$@" || true diff --git a/main.py b/main.py new file mode 100755 index 0000000..716c3d8 --- /dev/null +++ b/main.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +import os +import re +import sys +from src.renee.__main__ import main + +# add script directory to the path to allow the CLI to work out-of-the-box +# without the need to install it via pip first +SCRIPT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src", "renee") +sys.path.append(SCRIPT_DIR) + +if ( + __name__ == "__main__" +): # this block is adapted from the executable file created by `pip install` + sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0]) + sys.exit(main()) diff --git a/pyproject.toml b/pyproject.toml index c4d2726..953aaec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,12 +33,13 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Topic :: Scientific/Engineering :: Bio-Informatics", ] -requires-python = ">=3.8" +requires-python = ">=3.11" dependencies = [ + "argparse", "Click >= 8.1.3", + "PySimpleGui < 5", "snakemake >= 7, < 8", "snaketool-utils >= 0.0.5", - "argparse" ] [project.optional-dependencies] diff --git a/resources/CCBRlogo.png b/resources/CCBRlogo.png new file mode 100644 index 0000000..256ea29 Binary files /dev/null and b/resources/CCBRlogo.png differ diff --git a/resources/nih-bnfo-logo.png b/resources/nih-bnfo-logo.png deleted file mode 100755 index f31bdf2..0000000 Binary files a/resources/nih-bnfo-logo.png and /dev/null differ diff --git a/src/renee/__main__.py b/src/renee/__main__.py index bda9af3..2a46b9c 100755 --- a/src/renee/__main__.py +++ b/src/renee/__main__.py @@ -2,7 +2,7 @@ # -*- coding: UTF-8 -*- """RENEE: Rna sEquencing aNalysis pipElinE: -An highly reproducible and portable RNA-seq data analysises pipeline +An highly reproducible and portable RNA-seq data analysis pipeline About: This is the main entry for the RENEE pipeline. USAGE: @@ -13,101 +13,41 @@ # Python standard library from __future__ import print_function -from shutil import copy, copytree -import sys, os, subprocess, re, json, textwrap, shlex, glob -from pathlib import Path -from datetime import datetime -import warnings +from shutil import copy +import json +import os +import subprocess +import sys +import textwrap # 3rd party imports from pypi -import argparse # potential python3 3rd party package, added in python/3.5 - +import argparse + +# local imports +from .cache import get_sif_cache_dir +from .run import run +from .dryrun import dryrun +from .gui import launch_gui +from .conditions import fatal +from .util import ( + get_hpcname, + get_tmp_dir, + get_genomes_list, + get_version, + check_python_version, + _cp_r_safe_, + orchestrate, +) # Pipeline Metadata and globals -def renee_base(rel_path): - basedir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - ) - return os.path.join(basedir, rel_path) - - RENEE_PATH = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) - -with open(renee_base("VERSION"), "r") as vfile: - __version__ = f"v{vfile.read().strip()}" - +__version__ = get_version() __home__ = os.path.dirname(os.path.abspath(__file__)) _name = os.path.basename(sys.argv[0]) _description = "a highly-reproducible RNA-seq pipeline" - -# check python version ... should be 3.7 or newer -MIN_PYTHON = (3, 7) -try: - assert sys.version_info >= MIN_PYTHON -except AssertionError: - exit( - f"{sys.argv[0]} requires Python {'.'.join([str(n) for n in MIN_PYTHON])} or newer" - ) - - -def scontrol_show(): - """Run scontrol show config and parse the output as a dictionary - @return scontrol_dict : - """ - scontrol_dict = dict() - scontrol_out = subprocess.run( - "scontrol show config", shell=True, capture_output=True, text=True - ).stdout - if len(scontrol_out) > 0: - for line in scontrol_out.split("\n"): - line_split = line.split("=") - if len(line_split) > 1: - scontrol_dict[line_split[0].strip()] = line_split[1].strip() - return scontrol_dict - - -def get_hpcname(): - """Get the HPC name (biowulf, frce, or an empty string) - @return hpcname - """ - scontrol_out = scontrol_show() - hpc = scontrol_out["ClusterName"] if "ClusterName" in scontrol_out.keys() else "" - if hpc == "fnlcr": - hpc = "frce" - return hpc - - -def get_tmp_dir(tmp_dir, outdir): - """Get default temporary directory for biowulf and frce. Allow user override.""" - hpc = get_hpcname() - if not tmp_dir: - if hpc == "biowulf": - tmp_dir = "/lscratch/$SLURM_JOBID" - elif hpc == "frce": - tmp_dir = outdir - else: - tmp_dir = None - return tmp_dir - - -def get_genomes_list(renee_path, hpcname=get_hpcname()): - """Get list of genome annotations available for the current platform - @return genomes_list - """ - genome_config_dir = os.path.join(renee_path, "config", "genomes", hpcname) - json_files = glob.glob(genome_config_dir + "/*.json") - if not json_files: - warnings.warn( - f"WARNING: No Genome Annotation JSONs found in {genome_config_dir}. Please specify a custom genome json file with `--genome`" - ) - genomes = [os.path.basename(file).replace(".json", "") for file in json_files] - return sorted(genomes) - - -# Get list of prebuilt genome annotations available for the platform -GENOMES_LIST = get_genomes_list(RENEE_PATH) +check_python_version() class Colors: @@ -144,79 +84,6 @@ class Colors: bg_white = "\33[47m" -def err(*message, **kwargs): - """Prints any provided args to standard error. - kwargs can be provided to modify print functions - behavior. - @param message : - Values printed to standard error - @params kwargs - Key words to modify print function behavior - """ - print(*message, file=sys.stderr, **kwargs) - - -def fatal(*message, **kwargs): - """Prints any provided args to standard error - and exits with an exit code of 1. - @param message : - Values printed to standard error - @params kwargs - Key words to modify print function behavior - """ - err(*message, **kwargs) - sys.exit(1) - - -def _now(): - ct = datetime.now() - now = ct.strftime("%y%m%d%H%M%S") - return now - - -def _get_file_mtime(f): - timestamp = datetime.fromtimestamp(os.path.getmtime(os.path.abspath(f))) - mtime = timestamp.strftime("%y%m%d%H%M%S") - return mtime - - -def exists(testpath): - """Checks if file exists on the local filesystem. - @param parser : - argparse parser object - @param testpath : - Name of file/directory to check - @return does_exist : - True when file/directory exists, False when file/directory does not exist - """ - does_exist = True - if not os.path.exists(testpath): - does_exist = False # File or directory does not exist on the filesystem - - return does_exist - - -def exe_in_path(cmd, path=None): - """Checks if an executable is in $PATH - @param cmd : - Name of executable to check - @param path : - Optional list of PATHs to check [default: $PATH] - @return : - True if exe in PATH, False if not in PATH - """ - if path is None: - path = os.environ["PATH"].split(os.pathsep) - - for prefix in path: - filename = os.path.join(prefix, cmd) - executable = os.access(filename, os.X_OK) - is_not_directory = os.path.isfile(filename) - if executable and is_not_directory: - return True - return False - - def permissions(parser, filename, *args, **kwargs): """Checks permissions using os.access() to see the user is authorized to access a file/directory. Checks for existence, readability, writability and executability via: @@ -228,7 +95,7 @@ def permissions(parser, filename, *args, **kwargs): @return filename : If file exists and user can read from file """ - if not exists(filename): + if not os.path.exists(filename): parser.error( "File '{}' does not exists! Failed to provide valid input.".format(filename) ) @@ -254,7 +121,7 @@ def check_cache(parser, cache, *args, **kwargs): @return cache : If singularity cache dir is valid """ - if not exists(cache): + if not os.path.exists(cache): # Cache directory does not exist on filesystem os.makedirs(cache) elif os.path.isfile(cache): @@ -272,7 +139,7 @@ def check_cache(parser, cache, *args, **kwargs): # Check that the user owns the child cache directory # May revert to os.getuid() if user id is not sufficient if ( - exists(os.path.join(cache, "cache")) + os.path.exists(os.path.join(cache, "cache")) and os.stat(os.path.join(cache, "cache")).st_uid != os.getuid() ): # User does NOT own the cache directory, raise error @@ -289,944 +156,6 @@ def check_cache(parser, cache, *args, **kwargs): return cache -def _cp_r_safe_( - source, target, resources=["workflow", "resources", "config"], safe_mode=True -): - """Private function: Given a list paths it will recursively copy each to the - target location. If a target path already exists, it will not over-write the - existing paths data when `safe_mode` is on. - @param resources : - List of paths to copy over to target location. - Default: ["workflow", "resources", "config"] - @params source : - Add a prefix PATH to each resource - @param target : - Target path to copy templates and required resources (aka destination) - @param safe_mode : - Only copy the resources to the target path - if they do not exist in the target path (default: True) - """ - for resource in resources: - destination = os.path.join(target, resource) - if os.path.exists(destination) and safe_mode: - print(f"🚫 path exists and `safe_mode` is ON, not copying: {destination}") - else: - # Required resources do not exist, or safe mode is off - copytree( - os.path.join(source, resource), destination, dirs_exist_ok=not safe_mode - ) - - -def rename(filename): - """Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz - To automatically rename the fastq files, a few assumptions are made. If the extension of the - FastQ file cannot be inferred, an exception is raised telling the user to fix the filename - of the fastq files. - @param filename : - Original name of file to be renamed - @return filename : - A renamed FastQ filename - """ - # Covers common extensions from SF, SRA, EBI, TCGA, and external sequencing providers - # key = regex to match string and value = how it will be renamed - extensions = { - # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. - ".R1.f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.f(ast)?q.gz$": ".R2.fastq.gz", - # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. - # Capture lane information as named group - ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", - # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. - "_1.f(ast)?q.gz$": ".R1.fastq.gz", - "_2.f(ast)?q.gz$": ".R2.fastq.gz", - } - - if filename.endswith(".R1.fastq.gz") or filename.endswith(".R2.fastq.gz"): - # Filename is already in the correct format - return filename - - converted = False - for regex, new_ext in extensions.items(): - matched = re.search(regex, filename) - if matched: - # regex matches with a pattern in extensions - converted = True - # Try to get substring for named group lane, retain this in new file extension - # Come back to this later, I am not sure if this is necessary - # That string maybe static (i.e. always the same) - # https://support.illumina.com/help/BaseSpace_OLH_009008/Content/Source/Informatics/BS/NamingConvention_FASTQ-files-swBS.htm# - try: - new_ext = "_{}{}".format(matched.group("lane"), new_ext) - except IndexError: - pass # Does not contain the named group lane - - filename = re.sub(regex, new_ext, filename) - break # only rename once - - if not converted: - raise NameError( - """\n\tFatal: Failed to rename provided input '{}'! - Cannot determine the extension of the user provided input file. - Please rename the file list above before trying again. - Here is example of acceptable input file extensions: - sampleName.R1.fastq.gz sampleName.R2.fastq.gz - sampleName_R1_001.fastq.gz sampleName_R2_001.fastq.gz - sampleName_1.fastq.gz sampleName_2.fastq.gz - Please also check that your input files are gzipped? - If they are not, please gzip them before proceeding again. - """.format( - filename, sys.argv[0] - ) - ) - - return filename - - -def _sym_safe_(input_data, target): - """Creates re-named symlinks for each FastQ file provided - as input. If a symlink already exists, it will not try to create a new symlink. - If relative source PATH is provided, it will be converted to an absolute PATH. - @param input_data ]>: - List of input files to symlink to target location - @param target : - Target path to copy templates and required resources - @return input_fastqs list[]: - List of renamed input FastQs - """ - input_fastqs = [] # store renamed fastq file names - for file in input_data: - filename = os.path.basename(file) - renamed = os.path.join(target, rename(filename)) - input_fastqs.append(renamed) - - if not exists(renamed): - # Create a symlink if it does not already exist - # Follow source symlinks to resolve any binding issues - os.symlink(os.path.abspath(os.path.realpath(file)), renamed) - - return input_fastqs - - -def initialize(sub_args, repo_path, output_path): - """Initialize the output directory and copy over required pipeline resources. - If user provides a output directory path that already exists on the filesystem - as a file (small chance of happening but possible), a OSError is raised. If the - output directory PATH already EXISTS, it will not try to create the directory. - If a resource also already exists in the output directory (i.e. output/workflow), - it will not try to copy over that directory. In the future, it maybe worth adding - an optional cli arg called --force, that can modify this behavior. Returns a list - of renamed FastQ files (i.e. renamed symlinks). - @param sub_args : - Parsed arguments for run sub-command - @param repo_path : - Path to RENEE source code and its templates - @param output_path : - Pipeline output path, created if it does not exist - @return inputs list[]: - List of pipeline's input FastQ files - """ - if not exists(output_path): - # Pipeline output directory does not exist on filesystem - os.makedirs(output_path) - - elif exists(output_path) and os.path.isfile(output_path): - # Provided Path for pipeline output directory exists as file - raise OSError( - """\n\tFatal: Failed to create provided pipeline output directory! - User provided --output PATH already exists on the filesystem as a file. - Please run {} again with a different --output PATH. - """.format( - sys.argv[0] - ) - ) - - # Copy over templates are other required resources - _cp_r_safe_( - source=repo_path, - target=output_path, - resources=["workflow", "resources", "config"], - ) - - # Create renamed symlinks to rawdata - inputs = _sym_safe_(input_data=sub_args.input, target=output_path) - - return inputs - - -def join_jsons(templates): - """Joins multiple JSON files to into one data structure - Used to join multiple template JSON files to create a global config dictionary. - @params templates : - List of template JSON files to join together - @return aggregated : - Dictionary containing the contents of all the input JSON files - """ - # Get absolute PATH to templates in renee git repo - repo_path = os.path.dirname(os.path.abspath(__file__)) - aggregated = {} - - for file in templates: - with open(os.path.join(repo_path, file), "r") as fh: - aggregated.update(json.load(fh)) - - return aggregated - - -def add_user_information(config): - """Adds username and user's home directory to config. - @params config : - Config dictionary containing metadata to run pipeline - @return config : - Updated config dictionary containing user information (username and home directory) - """ - # Get PATH to user's home directory - # Method is portable across unix-like OS and Windows - home = os.path.expanduser("~") - - # Get username from home directory PATH - username = os.path.split(home)[-1] - - # Update config with home directory and username - config["project"]["userhome"] = home - config["project"]["username"] = username - - return config - - -def get_nends(ifiles): - """Determines whether the dataset is paired-end or single-end. - If paired-end data, checks to see if both mates (R1 and R2) are present for each sample. - If single-end, nends is set to 1. Else if paired-end, nends is set to 2. - @params ifiles list[]: - List containing pipeline input files (renamed symlinks) - @return nends_status : - Integer reflecting nends status: 1 = se, 2 = pe - """ - # Determine if dataset contains paired-end data - paired_end = False - nends_status = 1 - for file in ifiles: - if file.endswith(".R2.fastq.gz"): - paired_end = True - nends_status = 2 - break # dataset is paired-end - - # Check to see if both mates (R1 and R2) are present paired-end data - if paired_end: - nends = {} # keep count of R1 and R2 for each sample - for file in ifiles: - # Split sample name on file extension - sample = re.split("\.R[12]\.fastq\.gz", os.path.basename(file))[0] - if sample not in nends: - nends[sample] = 0 - - nends[sample] += 1 - - # Check if samples contain both read mates - missing_mates = [sample for sample, count in nends.items() if count == 1] - if missing_mates: - # Missing an R1 or R2 for a provided input sample - raise NameError( - """\n\tFatal: Detected pair-end data but user failed to provide - both mates (R1 and R2) for the following samples:\n\t\t{}\n - Please check that the basename for each sample is consistent across mates. - Here is an example of a consistent basename across mates: - consistent_basename.R1.fastq.gz - consistent_basename.R2.fastq.gz - - Please do not run the pipeline with a mixture of single-end and paired-end - samples. This feature is currently not supported within {}, and it is - not recommended either. If this is a priority for your project, please run - paired-end samples and single-end samples separately (in two separate output directories). - If you feel like this functionality should exist, feel free to open an issue on Github. - """.format( - missing_mates, sys.argv[0] - ) - ) - - return nends_status - - -def get_fastq_screen_paths(fastq_screen_confs, match="DATABASE", file_index=-1): - """Parses fastq_screen.conf files to get the paths of each fastq_screen database. - This path contains bowtie2 indices for reference genome to screen against. - The paths are added as singularity bind points. - @param fastq_screen_confs list[]: - Name of fastq_screen config files to parse - @param match : - Keyword to indicate a line match [default: 'DATABASE'] - @param file_index : - Index of line line containing the fastq_screen database path - @return list[]: - Returns a list of fastq_screen database paths - """ - databases = [] - for file in fastq_screen_confs: - with open(file, "r") as fh: - for line in fh: - if line.startswith(match): - db_path = line.strip().split()[file_index] - databases.append(db_path) - return databases - - -def get_rawdata_bind_paths(input_files): - """ - Gets rawdata bind paths of user provided fastq files. - @params input_files list[]: - List containing user-provided input fastq files - @return bindpaths : - Set of rawdata bind paths - """ - bindpaths = [] - for file in input_files: - # Get directory of input file - rawdata_src_path = os.path.dirname(os.path.abspath(os.path.realpath(file))) - if rawdata_src_path not in bindpaths: - bindpaths.append(rawdata_src_path) - - return bindpaths - - -def add_sample_metadata(input_files, config, group=None): - """Adds sample metadata such as sample basename, label, and group information. - If sample sheet is provided, it will default to using information in that file. - If no sample sheet is provided, it will only add sample basenames and labels. - @params input_files list[]: - List containing pipeline input fastq files - @params config : - Config dictionary containing metadata to run pipeline - @params group : - Sample sheet containing basename, group, and label for each sample - @return config : - Updated config with basenames, labels, and groups (if provided) - """ - # TODO: Add functionality for basecase when user has samplesheet - added = [] - for file in input_files: - # Split sample name on file extension - sample = re.split("\.R[12]\.fastq\.gz", os.path.basename(file))[0] - if sample not in added: - # Only add PE sample information once - added.append(sample) - config["project"]["groups"]["rsamps"].append(sample) - config["project"]["groups"]["rgroups"].append(sample) - config["project"]["groups"]["rlabels"].append(sample) - - return config - - -def add_rawdata_information(sub_args, config, ifiles): - """Adds information about rawdata provided to pipeline. - Determines whether the dataset is paired-end or single-end and finds the set of all - rawdata directories (needed for -B option when running singularity). If a user provides - paired-end data, checks to see if both mates (R1 and R2) are present for each sample. - @param sub_args : - Parsed arguments for run sub-command - @params ifiles list[]: - List containing pipeline input files (renamed symlinks) - @params config : - Config dictionary containing metadata to run pipeline - @return config : - Updated config dictionary containing user information (username and home directory) - """ - # Determine whether dataset is paired-end or single-ends - # Updates config['project']['nends']: 1 = single-end, 2 = paired-end - nends = get_nends(ifiles) # Checks PE data for both mates (R1 and R2) - config["project"]["nends"] = nends - - # Finds the set of rawdata directories to bind - rawdata_paths = get_rawdata_bind_paths(input_files=sub_args.input) - config["project"]["datapath"] = ",".join(rawdata_paths) - - # Add each sample's basename, label and group info - config = add_sample_metadata(input_files=ifiles, config=config) - - return config - - -def image_cache(sub_args, config): - """Adds Docker Image URIs, or SIF paths to config if singularity cache option is provided. - If singularity cache option is provided and a local SIF does not exist, a warning is - displayed and the image will be pulled from URI in 'config/containers/images.json'. - @param sub_args : - Parsed arguments for run sub-command - @params config : - Docker Image config file - @return config : - Updated config dictionary containing user information (username and home directory) - """ - # Get absolute PATH to templates in renee git repo - repo_path = os.path.dirname(os.path.abspath(__file__)) - images = os.path.join(sub_args.output, "config", "containers", "images.json") - - # Read in config for docker image uris - with open(images, "r") as fh: - data = json.load(fh) - # Check if local sif exists - for image, uri in data["images"].items(): - if sub_args.sif_cache: - sif = os.path.join( - sub_args.sif_cache, - "{}.sif".format(os.path.basename(uri).replace(":", "_")), - ) - if not exists(sif): - # If local sif does not exist on in cache, print warning - # and default to pulling from URI in config/containers/images.json - print( - 'Warning: Local image "{}" does not exist in singularity cache'.format( - sif - ), - file=sys.stderr, - ) - else: - # Change pointer to image from Registry URI to local SIF - data["images"][image] = sif - - config.update(data) - - return config - - -def get_repo_git_commit_hash(repo_path): - """Gets the git commit hash of the RENEE repo. - @param repo_path : - Path to RENEE git repo - @return githash : - Latest git commit hash - """ - try: - githash = ( - subprocess.check_output( - ["git", "rev-parse", "HEAD"], stderr=subprocess.STDOUT, cwd=repo_path - ) - .strip() - .decode("utf-8") - ) - # Typecast to fix python3 TypeError (Object of type bytes is not JSON serializable) - # subprocess.check_output() returns a byte string - githash = str(githash) - except Exception as e: - # Github releases are missing the .git directory, - # meaning you cannot get a commit hash, set the - # commit hash to indicate its from a GH release - githash = "github_release" - - return githash - - -def setup(sub_args, ifiles, repo_path, output_path): - """Setup the pipeline for execution and creates config file from templates - @param sub_args : - Parsed arguments for run sub-command - @param repo_path : - Path to RENEE source code and its templates - @param output_path : - Pipeline output path, created if it does not exist - @return config : - Config dictionary containing metadata to run the pipeline - @return hpcname : - """ - # Resolves PATH to template for genomic reference files to select from a - # bundled reference genome or a user generated reference genome built via - # renee build subcommand - hpcname = get_hpcname() - if hpcname == "biowulf": - print("Thank you for running RENEE on BIOWULF!") - genome_config = os.path.join( - output_path, "config", "genomes", hpcname, sub_args.genome + ".json" - ) - elif hpcname == "frce": - print("Thank you for running RENEE on FRCE!") - genome_config = os.path.join( - output_path, "config", "genomes", hpcname, sub_args.genome + ".json" - ) - else: - genome_config = os.path.join( - output_path, "config", "genomes", sub_args.genome + ".json" - ) - if sub_args.genome.endswith(".json"): - # Provided a custom reference genome generated by renee build - genome_config = os.path.abspath(sub_args.genome) - - required = { - # Template for project-level information - "project": os.path.join(output_path, "config", "templates", "project.json"), - # Template for genomic reference files - # User provided argument --genome is used to select the template - "genome": genome_config, - # Template for tool information - "tools": os.path.join(output_path, "config", "templates", "tools.json"), - } - - # Global config file for pipeline, config.json - config = join_jsons(required.values()) # uses templates in the renee repo - # Update cluster-specific paths for fastq screen & kraken db - if hpcname == "biowulf" or hpcname == "frce": - db_json_filename = os.path.join( - output_path, "config", "templates", f"dbs_{hpcname}.json" - ) - with open( - os.path.join(os.path.dirname(os.path.abspath(__file__)), db_json_filename), - "r", - ) as json_file: - config["bin"]["rnaseq"]["tool_parameters"].update(json.load(json_file)) - - config = add_user_information(config) - config = add_rawdata_information(sub_args, config, ifiles) - - # Resolves if an image needs to be pulled from an OCI registry or - # a local SIF generated from the renee cache subcommand exists - config = image_cache(sub_args, config) - - # Add other cli collected info - config["project"]["annotation"] = sub_args.genome - config["project"]["version"] = __version__ - config["project"]["pipelinehome"] = os.path.dirname(__file__) - config["project"]["workpath"] = os.path.abspath(sub_args.output) - genome_annotation = sub_args.genome - config["project"]["organism"] = genome_annotation.split("_")[0] - - # Add optional cli workflow steps - config["options"] = {} - config["options"]["star_2_pass_basic"] = sub_args.star_2_pass_basic - config["options"]["small_rna"] = sub_args.small_rna - config["options"]["tmp_dir"] = get_tmp_dir(sub_args.tmp_dir, output_path) - config["options"]["shared_resources"] = sub_args.shared_resources - if sub_args.wait: - config["options"]["wait"] = "True" - else: - config["options"]["wait"] = "False" - if sub_args.create_nidap_folder: - config["options"]["create_nidap_folder"] = "True" - else: - config["options"]["create_nidap_folder"] = "False" - - # Get latest git commit hash - git_hash = get_repo_git_commit_hash(repo_path) - config["project"]["git_commit_hash"] = git_hash - - if sub_args.shared_resources: - # Update paths to shared resources directory - config["bin"]["rnaseq"]["tool_parameters"][ - "FASTQ_SCREEN_CONFIG" - ] = os.path.join( - sub_args.shared_resources, "fastq_screen_db", "fastq_screen.conf" - ) - config["bin"]["rnaseq"]["tool_parameters"][ - "FASTQ_SCREEN_CONFIG2" - ] = os.path.join( - sub_args.shared_resources, "fastq_screen_db", "fastq_screen_2.conf" - ) - config["bin"]["rnaseq"]["tool_parameters"]["KRAKENBACDB"] = os.path.join( - sub_args.shared_resources, "20180907_standard_kraken2" - ) - - # Save config to output directory - print( - "\nGenerating config file in '{}'... ".format( - os.path.join(output_path, "config.json") - ), - end="", - ) - with open(os.path.join(output_path, "config.json"), "w") as fh: - json.dump(config, fh, indent=4, sort_keys=True) - print("Done!") - - return config - - -def dryrun( - outdir, - config="config.json", - snakefile=os.path.join("workflow", "Snakefile"), - write_to_file=True, -): - """Dryruns the pipeline to ensure there are no errors prior to running. - @param outdir : - Pipeline output PATH - @return dryrun_output : - Byte string representation of dryrun command - """ - try: - dryrun_output = subprocess.check_output( - [ - "snakemake", - "-npr", - "-s", - str(snakefile), - "--use-singularity", - "--rerun-incomplete", - "--cores", - "4", - "--configfile={}".format(config), - ], - cwd=outdir, - stderr=subprocess.STDOUT, - ) - - except subprocess.CalledProcessError as e: - # Singularity is NOT in $PATH - # Tell user to load both main dependencies to avoid the OSError below - print( - "Are singularity and snakemake in your PATH? Please check before proceeding again!" - ) - sys.exit("{}\n{}".format(e, e.output.decode("utf-8"))) - except OSError as e: - # Catch: OSError: [Errno 2] No such file or directory - # Occurs when command returns a non-zero exit-code - if e.errno == 2 and not exe_in_path("snakemake"): - # Failure caused because snakemake is NOT in $PATH - print( - "\x1b[6;37;41m\nError: Are snakemake AND singularity in your $PATH?\nPlease check before proceeding again!\x1b[0m", - file=sys.stderr, - ) - sys.exit("{}".format(e)) - else: - # Failure caused by unknown cause, raise error - raise e - - if write_to_file: - now = _now() - with open(os.path.join(outdir, "dryrun." + str(now) + ".log"), "w") as outfile: - outfile.write("{}".format(dryrun_output.decode("utf-8"))) - - return dryrun_output - - -def orchestrate( - mode, - outdir, - additional_bind_paths, - alt_cache, - threads=2, - submission_script="runner", - masterjob="pl:renee", - tmp_dir=None, - wait="", - hpcname="", -): - """Runs RENEE pipeline via selected executor: local or slurm. - If 'local' is selected, the pipeline is executed locally on a compute node/instance. - If 'slurm' is selected, jobs will be submitted to the cluster using SLURM job scheduler. - Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. - @param outdir : - Pipeline output PATH - @param mode : - Execution method or mode: - local runs serially a compute instance without submitting to the cluster. - slurm will submit jobs to the cluster using the SLURM job scheduler. - @param additional_bind_paths : - Additional paths to bind to container filesystem (i.e. input file paths) - @param alt_cache : - Alternative singularity cache location - @param threads : - Number of threads to use for local execution method - @param submission_script : - Path to master jobs submission script: - renee run = /path/to/output/resources/runner - renee build = /path/to/output/resources/builder - @param masterjob : - Name of the master job - @param tmp_dir : - Absolute Path to temp dir for compute node - @param wait : - "--wait" to wait for master job to finish. This waits when pipeline is called via NIDAP API - @param hpcname : - "biowulf" if run on biowulf, "frce" if run on frce, blank otherwise. hpcname is determined in setup() function - @return masterjob : - """ - # Add additional singularity bind PATHs - # to mount the local filesystem to the - # containers filesystem, NOTE: these - # PATHs must be an absolute PATHs - outdir = os.path.abspath(outdir) - # Add any default PATHs to bind to - # the container's filesystem, like - # tmp directories, /lscratch - addpaths = [] - # set tmp_dir depending on hpc - tmp_dir = get_tmp_dir(tmp_dir, outdir) - temp = os.path.dirname(tmp_dir.rstrip("/")) - if temp == os.sep: - temp = tmp_dir.rstrip("/") - if outdir not in additional_bind_paths.split(","): - addpaths.append(outdir) - if temp not in additional_bind_paths.split(","): - addpaths.append(temp) - bindpaths = ",".join(addpaths) - - # Set ENV variable 'SINGULARITY_CACHEDIR' - # to output directory - my_env = {} - my_env.update(os.environ) - cache = os.path.join(outdir, ".singularity") - my_env["SINGULARITY_CACHEDIR"] = cache - - if alt_cache: - # Override the pipeline's default cache location - my_env["SINGULARITY_CACHEDIR"] = alt_cache - cache = alt_cache - - if additional_bind_paths: - # Add Bind PATHs for outdir and tmp dir - if bindpaths: - bindpaths = ",{}".format(bindpaths) - bindpaths = "{}{}".format(additional_bind_paths, bindpaths) - - if not exists(os.path.join(outdir, "logfiles")): - # Create directory for logfiles - os.makedirs(os.path.join(outdir, "logfiles")) - - if exists(os.path.join(outdir, "logfiles", "snakemake.log")): - mtime = _get_file_mtime(os.path.join(outdir, "logfiles", "snakemake.log")) - newname = os.path.join(outdir, "logfiles", "snakemake." + str(mtime) + ".log") - os.rename(os.path.join(outdir, "logfiles", "snakemake.log"), newname) - - # Create .singularity directory for installations of snakemake - # without setuid which create a sandbox in the SINGULARITY_CACHEDIR - if not exists(cache): - # Create directory for sandbox and image layers - os.makedirs(cache) - - # Run on compute node or instance without submitting jobs to a scheduler - if mode == "local": - # Run RENEE: instantiate main/master process - # Look into later: it maybe worth replacing Popen subprocess with a direct - # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html - # Create log file for pipeline - logfh = open(os.path.join(outdir, "logfiles", "snakemake.log"), "w") - masterjob = subprocess.Popen( - [ - "snakemake", - "-pr", - "--use-singularity", - "--singularity-args", - "'-B {}'".format(bindpaths), - "--cores", - str(threads), - "--configfile=config.json", - ], - cwd=outdir, - env=my_env, - ) - - # Submitting jobs to cluster via SLURM's job scheduler - elif mode == "slurm": - # Run RENEE: instantiate main/master process - # Look into later: it maybe worth replacing Popen subprocess with a direct - # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html - # snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds - # --cluster-config $R/cluster.json --keep-going --restart-times 3 - # --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}" - # -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T - # 2>&1| tee -a $R/Reports/snakemake.log - - # Create log file for master job information - logfh = open(os.path.join(outdir, "logfiles", "master.log"), "w") - # submission_script for renee run is /path/to/output/resources/runner - # submission_script for renee build is /path/to/output/resources/builder - cmdlist = [ - str(os.path.join(outdir, "resources", str(submission_script))), - mode, - "-j", - str(masterjob), - "-b", - str(bindpaths), - "-o", - str(outdir), - "-c", - str(cache), - "-t", - str(tmp_dir), - ] - if str(wait) == "--wait": - cmdlist.append("-w") - if str(hpcname) != "": - cmdlist.append("-n") - cmdlist.append(hpcname) - else: - cmdlist.append("-n") - cmdlist.append("unknown") - - print(" ".join(cmdlist)) - masterjob = subprocess.Popen( - cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env - ) - - return masterjob - - -def resolve_additional_bind_paths(search_paths): - """Finds additional singularity bind paths from a list of random paths. Paths are - indexed with a compostite key containing the first two directories of an absolute - file path to avoid issues related to shared names across the /gpfs shared network - filesystem. For each indexed list of file paths, a common path is found. Assumes - that the paths provided are absolute paths, the renee build sub command creates - resource file index with absolute filenames. - @param search_paths list[]: - List of absolute file paths to find common bind paths from - @return common_paths list[]: - Returns a list of common shared file paths to create additional singularity bind paths - """ - common_paths = [] - indexed_paths = {} - - for ref in search_paths: - # Skip over resources with remote URI and - # skip over strings that are not file PATHS as - # RENEE build creates absolute resource PATHS - if ( - ref.lower().startswith("sftp://") - or ref.lower().startswith("s3://") - or ref.lower().startswith("gs://") - or not ref.lower().startswith(os.sep) - ): - continue - - # Break up path into directory tokens - for r in [ - ref, - str(Path(ref).resolve()), - ]: # taking care of paths which are symlinks! - path_list = os.path.abspath(r).split(os.sep) - - try: # Create composite index from first two directories - # Avoids issues created by shared /gpfs/ PATHS - index = path_list[1:3] - index = tuple(index) - except IndexError: - index = path_list[1] # ref startswith / - if index not in indexed_paths: - indexed_paths[index] = [] - # Create an INDEX to find common PATHS for each root child directory - # like /scratch or /data. This prevents issues when trying to find the - # common path between these two different directories (resolves to /) - indexed_paths[index].append(str(os.sep).join(path_list)) - - for index, paths in indexed_paths.items(): - # Find common paths for each path index - common_paths.append(os.path.dirname(os.path.commonprefix(paths))) - - return list(set(common_paths)) - - -def run(sub_args): - """Initialize, setup, and run the RENEE pipeline. - Calls initialize() to create output directory and copy over pipeline resources, - setup() to create the pipeline config file, dryrun() to ensure their are no issues - before running the pipeline, and finally run() to execute the Snakemake workflow. - @param sub_args : - Parsed arguments for run sub-command - """ - # Get PATH to RENEE git repository for copying over pipeline resources - - # hpcname is either biowulf, frce, or blank - hpcname = get_hpcname() - if sub_args.runmode == "init" or not os.path.exists( - os.path.join(sub_args.output, "config.json") - ): - # Initialize working directory, copy over required pipeline resources - input_files = initialize( - sub_args, repo_path=RENEE_PATH, output_path=sub_args.output - ) - - # Step pipeline for execution, create config.json config file from templates - config = setup( - sub_args, - ifiles=input_files, - repo_path=RENEE_PATH, - output_path=sub_args.output, - ) - # load config from existing file - else: - with open(os.path.join(sub_args.output, "config.json"), "r") as config_file: - config = json.load(config_file) - - # ensure the working dir is read/write friendly - scripts_path = os.path.join(sub_args.output, "workflow", "scripts") - os.chmod(scripts_path, 0o755) - - # Optional Step: Dry-run pipeline - if sub_args.dry_run: - dryrun_output = dryrun( - outdir=sub_args.output - ) # python3 returns byte-string representation - print("\nDry-running RENEE pipeline:\n{}".format(dryrun_output.decode("utf-8"))) - # sys.exit(0) # DONT exit now ... exit after printing singularity bind paths - - # determine "wait" - wait = "" - if sub_args.wait: - wait = "--wait" - - # Resolve all Singularity Bindpaths - rawdata_bind_paths = config["project"]["datapath"] - - # Get FastQ Screen Database paths - # and other reference genome file paths - fqscreen_cfg1 = config["bin"]["rnaseq"]["tool_parameters"]["FASTQ_SCREEN_CONFIG"] - fqscreen_cfg2 = config["bin"]["rnaseq"]["tool_parameters"]["FASTQ_SCREEN_CONFIG2"] - fq_screen_paths = get_fastq_screen_paths( - [ - os.path.join(sub_args.output, fqscreen_cfg1), - os.path.join(sub_args.output, fqscreen_cfg2), - ] - ) - kraken_db_path = [config["bin"]["rnaseq"]["tool_parameters"]["KRAKENBACDB"]] - genome_bind_paths = resolve_additional_bind_paths( - list(config["references"]["rnaseq"].values()) + fq_screen_paths + kraken_db_path - ) - all_bind_paths = "{},{}".format(",".join(genome_bind_paths), rawdata_bind_paths) - - if sub_args.dry_run: # print singularity bind baths and exit - print("\nSingularity Bind Paths:{}".format(all_bind_paths)) - sys.exit(0) - - # Run pipeline - masterjob = orchestrate( - mode=sub_args.mode, - outdir=sub_args.output, - additional_bind_paths=all_bind_paths, - alt_cache=sub_args.singularity_cache, - threads=sub_args.threads, - tmp_dir=get_tmp_dir(sub_args.tmp_dir, sub_args.output), - wait=wait, - hpcname=hpcname, - ) - - # Wait for subprocess to complete, - # this is blocking - masterjob.wait() - - # Relay information about submission - # of the master job or the exit code of the - # pipeline that ran in local mode - if sub_args.mode == "local": - if int(masterjob.returncode) == 0: - print("{} pipeline has successfully completed".format("RENEE")) - else: - fatal( - "{} pipeline failed. Please see standard output for more information.".format( - "RENEE" - ) - ) - elif sub_args.mode == "slurm": - jobid = ( - open(os.path.join(sub_args.output, "logfiles", "mjobid.log")).read().strip() - ) - if int(masterjob.returncode) == 0: - print("Successfully submitted master job: ", end="") - else: - fatal( - "Error occurred when submitting the master job. Error code = {}".format( - masterjob.returncode - ) - ) - print(jobid) - - def unlock(sub_args): """Unlocks a previous runs output directory. If snakemake fails ungracefully, it maybe required to unlock the working directory before proceeding again. @@ -1244,7 +173,9 @@ def unlock(sub_args): cwd=outdir, stderr=subprocess.STDOUT, ) - except subprocess.CalledProcessError as e: + except ( + subprocess.CalledProcessError + ) as e: # TODO: why capture this exception at all? # Unlocking process returned a non-zero exit code sys.exit("{}\n{}".format(e, e.output)) @@ -1275,7 +206,7 @@ def _sym_refs(input_data, target, make_copy=False): source_name = os.path.abspath(os.path.realpath(file)) canocial_input_paths.append(os.path.dirname(source_name)) - if not exists(target_name): + if not os.path.exists(target_name): if not make_copy: # Create a symlink if it does not already exist # Follow source symlinks to resolve any binding issues @@ -1350,11 +281,11 @@ def configure_build(sub_args, git_repo, output_path): @return additional_bind_paths list[]: List of canonical paths for the list of input files to be added singularity bindpath """ - if not exists(output_path): + if not os.path.exists(output_path): # Pipeline output directory does not exist on filesystem os.makedirs(output_path) - elif exists(output_path) and os.path.isfile(output_path): + elif os.path.exists(output_path) and os.path.isfile(output_path): # Provided Path for pipeline output directory exists as file raise OSError( """\n\tFatal: Failed to create provided pipeline output directory! @@ -1490,10 +421,10 @@ def cache(sub_args): images = os.path.join(repo_path, "config", "containers", "images.json") # Create image cache - if not exists(sif_cache): + if not os.path.exists(sif_cache): # Pipeline output directory does not exist on filesystem os.makedirs(sif_cache) - elif exists(sif_cache) and os.path.isfile(sif_cache): + elif os.path.exists(sif_cache) and os.path.isfile(sif_cache): # Provided Path for pipeline output directory exists as file raise OSError( """\n\tFatal: Failed to create provided sif cache directory! @@ -1513,7 +444,7 @@ def cache(sub_args): sif = os.path.join( sif_cache, "{}.sif".format(os.path.basename(uri).replace(":", "_")) ) - if not exists(sif): + if not os.path.exists(sif): # If local sif does not exist on in cache, print warning # and default to pulling from URI in config/containers/images.json print('Image will be pulled from "{}".'.format(uri), file=sys.stderr) @@ -1573,7 +504,7 @@ def genome_options(parser, user_option, prebuilt): # Checks against valid pre-built options # TODO: makes this more dynamic in the future to have it check against # a list of genomes (files) in config/genomes/*.json - elif not user_option in prebuilt: + elif user_option not in prebuilt: # User did NOT provide a valid choice parser.error( """provided invalid choice, '{}', to --genome argument!\n @@ -1608,9 +539,7 @@ def parsed_arguments(name, description): description = "{0}{1}{2}".format(c.bold, description, c.end) # Create a top-level parser - parser = argparse.ArgumentParser( - description="{}: {}".format(styled_name, description) - ) + parser = argparse.ArgumentParser(prog="renee", description=description) # Adding Version information parser.add_argument( @@ -1841,7 +770,7 @@ def parsed_arguments(name, description): {2}{3}Prebuilt genome+annotation combos:{4} {5} """.format( - "renee", __version__, c.bold, c.url, c.end, list(GENOMES_LIST) + "renee", __version__, c.bold, c.url, c.end, list(get_genomes_list()) ) ) @@ -1856,6 +785,12 @@ def parsed_arguments(name, description): add_help=False, ) + subparser_gui = subparsers.add_parser( + "gui", + help="Launch the RENEE pipeline with a Graphical User Interface (GUI)", + description="", + ) + # Required Arguments # Input FastQ files subparser_run.add_argument( @@ -1881,7 +816,9 @@ def parsed_arguments(name, description): subparser_run.add_argument( "--genome", required=True, - type=lambda option: str(genome_options(subparser_run, option, GENOMES_LIST)), + type=lambda option: str( + genome_options(subparser_run, option, get_genomes_list()) + ), help=argparse.SUPPRESS, ) @@ -1973,6 +910,7 @@ def parsed_arguments(name, description): type=lambda option: os.path.abspath(os.path.expanduser(option)), required=False, help=argparse.SUPPRESS, + default=get_sif_cache_dir(hpc=get_hpcname()), ) # Create NIDAP output folder @@ -2175,7 +1113,7 @@ def parsed_arguments(name, description): --output /data/$USER/refs/mm39_M26 \\ --dry-run - # Step 2A.) Build RENEE reference files + # Step 2A.) Build {0} reference files renee build --ref-fa GRCm39.primary_assembly.genome.fa \\ --ref-name mm39 \\ --ref-gtf gencode.vM26.annotation.gtf \\ @@ -2188,7 +1126,7 @@ def parsed_arguments(name, description): {2}{3}Prebuilt genome+annotation combos:{4} {5} """.format( - "renee", __version__, c.bold, c.url, c.end, list(GENOMES_LIST) + "renee", __version__, c.bold, c.url, c.end, list(get_genomes_list()) ) ) @@ -2516,6 +1454,7 @@ def parsed_arguments(name, description): subparser_unlock.set_defaults(func=unlock) subparser_build.set_defaults(func=build) subparser_cache.set_defaults(func=cache) + subparser_gui.set_defaults(func=launch_gui) # Parse command-line args args = parser.parse_args() diff --git a/src/renee/cache.py b/src/renee/cache.py new file mode 100644 index 0000000..a908634 --- /dev/null +++ b/src/renee/cache.py @@ -0,0 +1,63 @@ +import json +import os +import sys + + +def get_singularity_cachedir(output_dir, cache_dir=None): + """Returns the singularity cache directory. + If no user-provided cache directory is provided, + the default singularity cache is in the output directory. + """ + if not cache_dir: + cache_dir = os.path.join(output_dir, ".singularity") + return cache_dir + + +def get_sif_cache_dir(hpc=None): + sif_dir = None + if hpc == "biowulf": + sif_dir = "/data/CCBR_Pipeliner/SIFS" + elif hpc == "frce": + sif_dir = "/mnt/projects/CCBR-Pipelines/SIFs" + return sif_dir + + +def image_cache(sub_args, config): + """Adds Docker Image URIs, or SIF paths to config if singularity cache option is provided. + If singularity cache option is provided and a local SIF does not exist, a warning is + displayed and the image will be pulled from URI in 'config/containers/images.json'. + @param sub_args : + Parsed arguments for run sub-command + @params config : + Docker Image config file + @return config : + Updated config dictionary containing user information (username and home directory) + """ + images = os.path.join(sub_args.output, "config", "containers", "images.json") + + # Read in config for docker image uris + with open(images, "r") as fh: + data = json.load(fh) + # Check if local sif exists + for image, uri in data["images"].items(): + if sub_args.sif_cache: + sif = os.path.join( + sub_args.sif_cache, + "{}.sif".format(os.path.basename(uri).replace(":", "_")), + ) + if not os.path.exists(sif): + # If local sif does not exist on in cache, print warning + # and default to pulling from URI in config/containers/images.json + print( + 'Warning: Local image "{}" does not exist in singularity cache'.format( + sif + ), + file=sys.stderr, + ) + else: + # Change pointer to image from Registry URI to local SIF + data["images"][image] = sif + + config.update(data) + + return config diff --git a/src/renee/conditions.py b/src/renee/conditions.py new file mode 100644 index 0000000..021870f --- /dev/null +++ b/src/renee/conditions.py @@ -0,0 +1,25 @@ +import sys + + +def fatal(*message, **kwargs): + """Prints any provided args to standard error + and exits with an exit code of 1. + @param message : + Values printed to standard error + @params kwargs + Key words to modify print function behavior + """ + err(*message, **kwargs) + sys.exit(1) + + +def err(*message, **kwargs): + """Prints any provided args to standard error. + kwargs can be provided to modify print functions + behavior. + @param message : + Values printed to standard error + @params kwargs + Key words to modify print function behavior + """ + print(*message, file=sys.stderr, **kwargs) diff --git a/src/renee/dryrun.py b/src/renee/dryrun.py new file mode 100644 index 0000000..93d6231 --- /dev/null +++ b/src/renee/dryrun.py @@ -0,0 +1,89 @@ +import datetime +import os +import subprocess +import sys + + +def dryrun( + outdir, + config="config.json", + snakefile=os.path.join("workflow", "Snakefile"), + write_to_file=True, +): + """Dryruns the pipeline to ensure there are no errors prior to running. + @param outdir : + Pipeline output PATH + @return dryrun_output : + Byte string representation of dryrun command + """ + try: + dryrun_output = subprocess.check_output( + [ + "snakemake", + "-npr", + "-s", + str(snakefile), + "--use-singularity", + "--rerun-incomplete", + "--cores", + "4", + "--configfile={}".format(config), + ], + cwd=outdir, + stderr=subprocess.STDOUT, + ) + + except subprocess.CalledProcessError as e: + # Singularity is NOT in $PATH + # Tell user to load both main dependencies to avoid the OSError below + print( + "Are singularity and snakemake in your PATH? Please check before proceeding again!" + ) + sys.exit("{}\n{}".format(e, e.output.decode("utf-8"))) + except OSError as e: + # Catch: OSError: [Errno 2] No such file or directory + # Occurs when command returns a non-zero exit-code + if e.errno == 2 and not exe_in_path("snakemake"): + # Failure caused because snakemake is NOT in $PATH + print( + "\x1b[6;37;41m\nError: Are snakemake AND singularity in your $PATH?\nPlease check before proceeding again!\x1b[0m", + file=sys.stderr, + ) + sys.exit("{}".format(e)) + else: + # Failure caused by unknown cause, raise error + raise e + + if write_to_file: + now = _now() + with open(os.path.join(outdir, "dryrun." + str(now) + ".log"), "w") as outfile: + outfile.write("{}".format(dryrun_output.decode("utf-8"))) + + return dryrun_output + + +def _now(): + ct = datetime.datetime.now() + now = ct.strftime("%y%m%d%H%M%S") + return now + + +def exe_in_path(cmd, path=None): + """Checks if an executable is in $PATH + @param cmd : + Name of executable to check + @param path : + Optional list of PATHs to check [default: $PATH] + @return : + True if exe in PATH, False if not in PATH + """ + if path is None: + path = os.environ["PATH"].split(os.pathsep) + + for prefix in path: + filename = os.path.join(prefix, cmd) + executable = os.access(filename, os.X_OK) + is_not_directory = os.path.isfile(filename) + if executable and is_not_directory: + return True + return False diff --git a/src/renee/gui.py b/src/renee/gui.py new file mode 100755 index 0000000..12aad81 --- /dev/null +++ b/src/renee/gui.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +import argparse +import glob +import io +import os +import PySimpleGUI as sg +import sys +from tkinter import Tk + +from .util import ( + get_genomes_dict, + get_tmp_dir, + get_shared_resources_dir, + renee_base, + get_version, + get_singularity_cachedir, + get_hpcname, +) +from .cache import get_sif_cache_dir +from .run import run_in_context + +# TODO: get rid of all the global variables +# TODO: let's use a tmp dir and put these files there instead. see for inspiration:https://github.com/CCBR/RENEE/blob/16d13dca1d5f0f43c7dfda379efb882a67635d17/tests/test_cache.py#L14-L28 +global FILES_TO_DELETE +FILES_TO_DELETE = list() + + +def launch_gui(sub_args, debug=True): + # get drop down genome+annotation options + jsons = get_genomes_dict(error_on_warnings=True) + genome_annotation_combinations = list(jsons.keys()) + genome_annotation_combinations.sort() + if debug: + print(jsons) + if debug: + print(genome_annotation_combinations) + + logo = sg.Image(renee_base(os.path.join("resources", "CCBRlogo.png"))) + # create layout + layout = [ + [sg.Column([[logo]], justification="center")], + [ + sg.Text( + "RENEE - Rna sEquencing aNalysis pipElinE", font=("Arial", 14, "bold") + ) + ], + [ + sg.Text( + "Input Fastqs folder", font=("Helvetica", 12, "bold"), size=(20, 1) + ), + sg.InputText(key="--INDIR--"), + sg.FolderBrowse(target="--INDIR--"), + ], + [ + sg.Text("Output folder", font=("Helvetica", 12, "bold"), size=(20, 1)), + sg.InputText(key="--OUTDIR--"), + sg.FolderBrowse(target="--OUTDIR--"), + ], + [ + sg.Text("Genome+Annotation", font=("Helvetica", 12, "bold"), size=(20, 1)), + sg.Combo( + values=genome_annotation_combinations, + key="--ANNOTATION--", + tooltip="eg. hg38_30 for Genome=hg38 & Gencode_Annotation=version 30", + ), + ], + [ + sg.Submit(key="--SUBMIT--", font=("Helvetica", 12)), + sg.Cancel(key="--CANCEL--", font=("Helvetica", 12)), + sg.Button( + button_text="Documentation", key="--DOC--", font=("Helvetica", 12) + ), + sg.Button(button_text="Help", key="--HELP--", font=("Helvetica", 12)), + ], + ] + if debug: + print("layout is ready!") + + window = sg.Window( + f"RENEE {get_version()}", layout, location=(0, 500), finalize=True + ) + if debug: + print("window created!") + + while True: + event, values = window.read() + if debug: + print(event, values) + # if any((event != 'Submit')): + if event == "--CANCEL--" or event == sg.WIN_CLOSED: + sg.popup_auto_close( + "Thank you for running RENEE. GoodBye!", + location=(0, 500), + title="", + font=("Arial", 12, "bold"), + ) + sys.exit(69) + if event == "--DOC--": + copy_to_clipboard("https://ccbr.github.io/RENEE/") + sg.Popup( + "Visit https://ccbr.github.io/RENEE/ for links to complete documentation. The link has been copied to your clipboard. Please paste it in your favorite web browser.", + font=("Arial", 12, "bold"), + location=(0, 500), + ) + continue + if event == "--HELP--": + copy_to_clipboard("ccbr_pipeliner@mail.nih.gov") + sg.Popup( + "Email ccbr_pipeliner@mail.nih.gov for help. The email id has been copied to your clipboard. Please paste it in your emailing software.", + font=("Arial", 12, "bold"), + location=(0, 500), + ) + continue + if event == "--SUBMIT--": + if values["--INDIR--"] == "": + sg.PopupError( + "Input folder must be provided!!", + location=(0, 500), + title="ERROR!", + font=("Arial", 12, "bold"), + ) + continue + elif not os.path.exists(values["--INDIR--"]) and not os.path.exists( + fixpath(values["--INDIR--"]) + ): + if debug: + print(values["--INDIR--"]) + if debug: + print(fixpath(values["--INDIR--"])) + sg.PopupError( + "Input folder doesn't exist!!", + location=(0, 500), + title="ERROR!", + font=("Arial", 12, "bold"), + ) + continue + else: + inputfastqs = get_fastqs(values["--INDIR--"]) + if debug: + print(inputfastqs) + if len(inputfastqs) == 0: + sg.PopupError( + "Input folder has no fastqs!!", + location=(0, 500), + title="ERROR!", + font=("Arial", 12, "bold"), + ) + window.Element("--INDIR--").update("") + continue + if values["--OUTDIR--"] == "": + sg.PopupError( + "Output folder must be provided!!", + location=(0, 500), + title="ERROR", + font=("Arial", 12, "bold"), + ) + continue + elif os.path.exists(values["--OUTDIR--"]) and not os.path.exists( + fixpath(values["--OUTDIR--"]) + ): + ch = sg.popup_yes_no( + "Output folder exists... this is probably a re-run ... proceed?", + title="Rerun?", + location=(0, 500), + font=("Arial", 12, "bold"), + ) + if ch == "No": + window.Element("--OUTDIR--").update("") + continue + # sg.Popup("Output folder exists... this is probably a re-run ... is it?",location=(0,500)) + genome = jsons[values["--ANNOTATION--"]] + output_dir = values["--OUTDIR--"] + # create sub args for renee run + run_args = argparse.Namespace( + input=inputfastqs, + output=output_dir, + genome=genome, + mode="slurm", + runmode="run", + dry_run=True, + sif_cache=get_sif_cache_dir(hpc=get_hpcname()), + singularity_cache=get_singularity_cachedir( + output_dir, os.environ.get("SINGULARITY_CACHEDIR", None) + ), + tmp_dir=get_tmp_dir(None, output_dir), + shared_resources=get_shared_resources_dir(None), + star_2_pass_basic=False, + small_rna=False, + create_nidap_folder=False, + wait=False, + threads=2, + ) + # execute dry run and capture stdout/stderr + allout = run_in_context(run_args) + sg.popup_scrolled( + allout, + title="Dryrun:STDOUT/STDERR", + font=("Monaco", 10), + location=(0, 500), + size=(80, 30), + ) + # TODO use a regex to simplify this line + if "error" in allout or "Error" in allout or "ERROR" in allout: + continue + ch = sg.popup_yes_no( + "Submit run to slurm?", + title="Submit??", + location=(0, 500), + font=("Arial", 12, "bold"), + ) + if ch == "Yes": + run_args.dry_run = False + # execute live run + allout = run_in_context(run_args) + sg.popup_scrolled( + allout, + title="Dryrun:STDOUT/STDERR", + font=("Monaco", 10), + location=(0, 500), + size=(80, 30), + ) + sg.popup_scrolled( + allout, + title="Slurmrun:STDOUT/STDERR", + font=("Monaco", 10), + location=(0, 500), + size=(80, 30), + ) + sg.popup_auto_close( + "Thank you for running RENEE. GoodBye!", + location=(0, 500), + title="", + font=("Arial", 12, "bold"), + ) + break + elif ch == "No": + window.Element("--INDIR--").update("") + window.Element("--OUTDIR--").update("") + window.Element("--ANNOTATION--").update("") + continue + + window.close() + if len(FILES_TO_DELETE) != 0: + delete_files(FILES_TO_DELETE) + + +def copy_to_clipboard(string): + r = Tk() + r.withdraw() + r.clipboard_clear() + r.clipboard_append(string) + r.update() + r.destroy() + + +def fixpath(p): + return os.path.abspath(os.path.expanduser(p)) + + +def get_fastqs(inputdir): + inputdir = fixpath(inputdir) + inputfastqs = glob.glob(inputdir + os.sep + "*.fastq.gz") + inputfqs = glob.glob(inputdir + os.sep + "*.fq.gz") + inputfastqs.extend(inputfqs) + return inputfastqs + + +def delete_files(files): + for f in files: + if os.path.exists(f): + os.remove(f) + + +if __name__ == "__main__": + launch_gui() diff --git a/src/renee/initialize.py b/src/renee/initialize.py new file mode 100644 index 0000000..75d2207 --- /dev/null +++ b/src/renee/initialize.py @@ -0,0 +1,144 @@ +import os +import re +import sys + +from .util import ( + _cp_r_safe_, +) + + +def initialize(sub_args, repo_path, output_path): + """Initialize the output directory and copy over required pipeline resources. + If user provides a output directory path that already exists on the filesystem + as a file (small chance of happening but possible), a OSError is raised. If the + output directory PATH already EXISTS, it will not try to create the directory. + If a resource also already exists in the output directory (i.e. output/workflow), + it will not try to copy over that directory. In the future, it maybe worth adding + an optional cli arg called --force, that can modify this behavior. Returns a list + of renamed FastQ files (i.e. renamed symlinks). + @param sub_args : + Parsed arguments for run sub-command + @param repo_path : + Path to RENEE source code and its templates + @param output_path : + Pipeline output path, created if it does not exist + @return inputs list[]: + List of pipeline's input FastQ files + """ + if not os.path.exists(output_path): + # Pipeline output directory does not exist on filesystem + os.makedirs(output_path) + + elif os.path.exists(output_path) and os.path.isfile(output_path): + # Provided Path for pipeline output directory exists as file + raise OSError( + """\n\tFatal: Failed to create provided pipeline output directory! + User provided --output PATH already exists on the filesystem as a file. + Please run {} again with a different --output PATH. + """.format( + sys.argv[0] + ) + ) + + # Copy over templates are other required resources + _cp_r_safe_( + source=repo_path, + target=output_path, + resources=["workflow", "resources", "config"], + ) + + # Create renamed symlinks to rawdata + inputs = _sym_safe_(input_data=sub_args.input, target=output_path) + + return inputs + + +def _sym_safe_(input_data, target): + """Creates re-named symlinks for each FastQ file provided + as input. If a symlink already exists, it will not try to create a new symlink. + If relative source PATH is provided, it will be converted to an absolute PATH. + @param input_data ]>: + List of input files to symlink to target location + @param target : + Target path to copy templates and required resources + @return input_fastqs list[]: + List of renamed input FastQs + """ + input_fastqs = [] # store renamed fastq file names + for file in input_data: + filename = os.path.basename(file) + renamed = os.path.join(target, rename(filename)) + input_fastqs.append(renamed) + + if not os.path.exists(renamed): + # Create a symlink if it does not already exist + # Follow source symlinks to resolve any binding issues + os.symlink(os.path.abspath(os.path.realpath(file)), renamed) + + return input_fastqs + + +def rename(filename): + """Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz + To automatically rename the fastq files, a few assumptions are made. If the extension of the + FastQ file cannot be inferred, an exception is raised telling the user to fix the filename + of the fastq files. + @param filename : + Original name of file to be renamed + @return filename : + A renamed FastQ filename + """ + # Covers common extensions from SF, SRA, EBI, TCGA, and external sequencing providers + # key = regex to match string and value = how it will be renamed + extensions = { + # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. + ".R1.f(ast)?q.gz$": ".R1.fastq.gz", + ".R2.f(ast)?q.gz$": ".R2.fastq.gz", + # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. + # Capture lane information as named group + ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", + ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", + # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. + "_1.f(ast)?q.gz$": ".R1.fastq.gz", + "_2.f(ast)?q.gz$": ".R2.fastq.gz", + } + + if filename.endswith(".R1.fastq.gz") or filename.endswith(".R2.fastq.gz"): + # Filename is already in the correct format + return filename + + converted = False + for regex, new_ext in extensions.items(): + matched = re.search(regex, filename) + if matched: + # regex matches with a pattern in extensions + converted = True + # Try to get substring for named group lane, retain this in new file extension + # Come back to this later, I am not sure if this is necessary + # That string maybe static (i.e. always the same) + # https://support.illumina.com/help/BaseSpace_OLH_009008/Content/Source/Informatics/BS/NamingConvention_FASTQ-files-swBS.htm# + try: + new_ext = "_{}{}".format(matched.group("lane"), new_ext) + except IndexError: + pass # Does not contain the named group lane + + filename = re.sub(regex, new_ext, filename) + break # only rename once + + if not converted: + raise NameError( + """\n\tFatal: Failed to rename provided input '{}'! + Cannot determine the extension of the user provided input file. + Please rename the file list above before trying again. + Here is example of acceptable input file extensions: + sampleName.R1.fastq.gz sampleName.R2.fastq.gz + sampleName_R1_001.fastq.gz sampleName_R2_001.fastq.gz + sampleName_1.fastq.gz sampleName_2.fastq.gz + Please also check that your input files are gzipped? + If they are not, please gzip them before proceeding again. + """.format( + filename + ) + ) + + return filename diff --git a/src/renee/run.py b/src/renee/run.py new file mode 100644 index 0000000..bc7cd0e --- /dev/null +++ b/src/renee/run.py @@ -0,0 +1,216 @@ +import contextlib +import io +import json +import os +import pathlib +import sys + +from .util import renee_base, get_hpcname, get_tmp_dir, orchestrate +from .conditions import fatal +from .initialize import initialize +from .setup import setup +from .dryrun import dryrun + + +def run(sub_args): + """Initialize, setup, and run the RENEE pipeline. + Calls initialize() to create output directory and copy over pipeline resources, + setup() to create the pipeline config file, dryrun() to ensure their are no issues + before running the pipeline, and finally run() to execute the Snakemake workflow. + @param sub_args : + Parsed arguments for run sub-command + """ + # Get PATH to RENEE git repository for copying over pipeline resources + + # hpcname is either biowulf, frce, or blank + hpcname = get_hpcname() + if sub_args.runmode == "init" or not os.path.exists( + os.path.join(sub_args.output, "config.json") + ): + # Initialize working directory, copy over required pipeline resources + input_files = initialize( + sub_args, repo_path=renee_base(), output_path=sub_args.output + ) + + # Step pipeline for execution, create config.json config file from templates + config = setup( + sub_args, + ifiles=input_files, + repo_path=renee_base(), + output_path=sub_args.output, + ) + # load config from existing file + else: + with open(os.path.join(sub_args.output, "config.json"), "r") as config_file: + config = json.load(config_file) + + # ensure the working dir is read/write friendly + scripts_path = os.path.join(sub_args.output, "workflow", "scripts") + os.chmod(scripts_path, 0o755) + + # Optional Step: Dry-run pipeline + if sub_args.dry_run: + dryrun_output = dryrun( + outdir=sub_args.output + ) # python3 returns byte-string representation + print("\nDry-running RENEE pipeline:\n{}".format(dryrun_output.decode("utf-8"))) + # sys.exit(0) # DONT exit now ... exit after printing singularity bind paths + + # determine "wait" + wait = "" + if sub_args.wait: + wait = "--wait" + + # Resolve all Singularity Bindpaths + rawdata_bind_paths = config["project"]["datapath"] + + # Get FastQ Screen Database paths + # and other reference genome file paths + fqscreen_cfg1 = config["bin"]["rnaseq"]["tool_parameters"]["FASTQ_SCREEN_CONFIG"] + fqscreen_cfg2 = config["bin"]["rnaseq"]["tool_parameters"]["FASTQ_SCREEN_CONFIG2"] + fq_screen_paths = get_fastq_screen_paths( + [ + os.path.join(sub_args.output, fqscreen_cfg1), + os.path.join(sub_args.output, fqscreen_cfg2), + ] + ) + kraken_db_path = [config["bin"]["rnaseq"]["tool_parameters"]["KRAKENBACDB"]] + genome_bind_paths = resolve_additional_bind_paths( + list(config["references"]["rnaseq"].values()) + fq_screen_paths + kraken_db_path + ) + all_bind_paths = "{},{}".format(",".join(genome_bind_paths), rawdata_bind_paths) + + if sub_args.dry_run: # print singularity bind baths and exit + print("\nSingularity Bind Paths:{}".format(all_bind_paths)) + # end at dry run + else: # continue with real run + # Run pipeline + masterjob = orchestrate( + mode=sub_args.mode, + outdir=sub_args.output, + additional_bind_paths=all_bind_paths, + alt_cache=sub_args.singularity_cache, + threads=sub_args.threads, + tmp_dir=get_tmp_dir(sub_args.tmp_dir, sub_args.output), + wait=wait, + hpcname=hpcname, + ) + + # Wait for subprocess to complete, + # this is blocking + masterjob.wait() + + # Relay information about submission + # of the master job or the exit code of the + # pipeline that ran in local mode + if sub_args.mode == "local": + if int(masterjob.returncode) == 0: + print("{} pipeline has successfully completed".format("RENEE")) + else: + fatal( + "{} pipeline failed. Please see standard output for more information.".format( + "RENEE" + ) + ) + elif sub_args.mode == "slurm": + jobid = ( + open(os.path.join(sub_args.output, "logfiles", "mjobid.log")) + .read() + .strip() + ) + if int(masterjob.returncode) == 0: + print("Successfully submitted master job: ", end="") + else: + fatal( + "Error occurred when submitting the master job. Error code = {}".format( + masterjob.returncode + ) + ) + print(jobid) + + +def resolve_additional_bind_paths(search_paths): + """Finds additional singularity bind paths from a list of random paths. Paths are + indexed with a compostite key containing the first two directories of an absolute + file path to avoid issues related to shared names across the /gpfs shared network + filesystem. For each indexed list of file paths, a common path is found. Assumes + that the paths provided are absolute paths, the renee build sub command creates + resource file index with absolute filenames. + @param search_paths list[]: + List of absolute file paths to find common bind paths from + @return common_paths list[]: + Returns a list of common shared file paths to create additional singularity bind paths + """ + common_paths = [] + indexed_paths = {} + + for ref in search_paths: + # Skip over resources with remote URI and + # skip over strings that are not file PATHS as + # RENEE build creates absolute resource PATHS + if ( + ref.lower().startswith("sftp://") + or ref.lower().startswith("s3://") + or ref.lower().startswith("gs://") + or not ref.lower().startswith(os.sep) + ): + continue + + # Break up path into directory tokens + for r in [ + ref, + str(pathlib.Path(ref).resolve()), + ]: # taking care of paths which are symlinks! + path_list = os.path.abspath(r).split(os.sep) + + try: # Create composite index from first two directories + # Avoids issues created by shared /gpfs/ PATHS + index = path_list[1:3] + index = tuple(index) + except IndexError: + index = path_list[1] # ref startswith / + if index not in indexed_paths: + indexed_paths[index] = [] + # Create an INDEX to find common PATHS for each root child directory + # like /scratch or /data. This prevents issues when trying to find the + # common path between these two different directories (resolves to /) + indexed_paths[index].append(str(os.sep).join(path_list)) + + for index, paths in indexed_paths.items(): + # Find common paths for each path index + common_paths.append(os.path.dirname(os.path.commonprefix(paths))) + + return list(set(common_paths)) + + +def get_fastq_screen_paths(fastq_screen_confs, match="DATABASE", file_index=-1): + """Parses fastq_screen.conf files to get the paths of each fastq_screen database. + This path contains bowtie2 indices for reference genome to screen against. + The paths are added as singularity bind points. + @param fastq_screen_confs list[]: + Name of fastq_screen config files to parse + @param match : + Keyword to indicate a line match [default: 'DATABASE'] + @param file_index : + Index of line line containing the fastq_screen database path + @return list[]: + Returns a list of fastq_screen database paths + """ + databases = [] + for file in fastq_screen_confs: + with open(file, "r") as fh: + for line in fh: + if line.startswith(match): + db_path = line.strip().split()[file_index] + databases.append(db_path) + return databases + + +def run_in_context(args): + """Execute the run function in a context manager to capture stdout/stderr""" + with contextlib.redirect_stdout(io.StringIO()) as out_f, contextlib.redirect_stderr( + io.StringIO() + ) as err_f: + run(args) + allout = out_f.getvalue() + "\n" + err_f.getvalue() + return allout diff --git a/src/renee/setup.py b/src/renee/setup.py new file mode 100644 index 0000000..000e3c8 --- /dev/null +++ b/src/renee/setup.py @@ -0,0 +1,319 @@ +import os +import json +import re +import subprocess +import sys + +from .util import ( + get_hpcname, + get_version, + get_tmp_dir, +) +from .cache import image_cache + + +def setup(sub_args, ifiles, repo_path, output_path): + """Setup the pipeline for execution and creates config file from templates + @param sub_args : + Parsed arguments for run sub-command + @param repo_path : + Path to RENEE source code and its templates + @param output_path : + Pipeline output path, created if it does not exist + @return config : + Config dictionary containing metadata to run the pipeline + @return hpcname : + """ + # Resolves PATH to template for genomic reference files to select from a + # bundled reference genome or a user generated reference genome built via + # renee build subcommand + hpcname = get_hpcname() + if hpcname == "biowulf": + print("Thank you for running RENEE on BIOWULF!") + genome_config = os.path.join( + output_path, "config", "genomes", hpcname, sub_args.genome + ".json" + ) + elif hpcname == "frce": + print("Thank you for running RENEE on FRCE!") + genome_config = os.path.join( + output_path, "config", "genomes", hpcname, sub_args.genome + ".json" + ) + else: + genome_config = os.path.join( + output_path, "config", "genomes", sub_args.genome + ".json" + ) + if sub_args.genome.endswith(".json"): + # Provided a custom reference genome generated by renee build + genome_config = os.path.abspath(sub_args.genome) + + required = { + # Template for project-level information + "project": os.path.join(output_path, "config", "templates", "project.json"), + # Template for genomic reference files + # User provided argument --genome is used to select the template + "genome": genome_config, + # Template for tool information + "tools": os.path.join(output_path, "config", "templates", "tools.json"), + } + + # Global config file for pipeline, config.json + config = join_jsons(required.values()) # uses templates in the renee repo + # Update cluster-specific paths for fastq screen & kraken db + if hpcname == "biowulf" or hpcname == "frce": + db_json_filename = os.path.join( + output_path, "config", "templates", f"dbs_{hpcname}.json" + ) + with open( + os.path.join(os.path.dirname(os.path.abspath(__file__)), db_json_filename), + "r", + ) as json_file: + config["bin"]["rnaseq"]["tool_parameters"].update(json.load(json_file)) + + config = add_user_information(config) + config = add_rawdata_information(sub_args, config, ifiles) + + # Resolves if an image needs to be pulled from an OCI registry or + # a local SIF generated from the renee cache subcommand exists + config = image_cache(sub_args, config) + + # Add other cli collected info + config["project"]["annotation"] = sub_args.genome + config["project"]["version"] = get_version() + config["project"]["pipelinehome"] = os.path.dirname(__file__) + config["project"]["workpath"] = os.path.abspath(sub_args.output) + genome_annotation = sub_args.genome + config["project"]["organism"] = genome_annotation.split("_")[0] + + # Add optional cli workflow steps + config["options"] = {} + config["options"]["star_2_pass_basic"] = sub_args.star_2_pass_basic + config["options"]["small_rna"] = sub_args.small_rna + config["options"]["tmp_dir"] = get_tmp_dir(sub_args.tmp_dir, output_path) + config["options"]["shared_resources"] = sub_args.shared_resources + if sub_args.wait: + config["options"]["wait"] = "True" + else: + config["options"]["wait"] = "False" + if sub_args.create_nidap_folder: + config["options"]["create_nidap_folder"] = "True" + else: + config["options"]["create_nidap_folder"] = "False" + + # Get latest git commit hash + git_hash = get_repo_git_commit_hash(repo_path) + config["project"]["git_commit_hash"] = git_hash + + if sub_args.shared_resources: + # Update paths to shared resources directory + config["bin"]["rnaseq"]["tool_parameters"]["KRAKENBACDB"] = os.path.join( + sub_args.shared_resources, "20180907_standard_kraken2" + ) + + # Save config to output directory + print( + "\nGenerating config file in '{}'... ".format( + os.path.join(output_path, "config.json") + ), + end="", + ) + with open(os.path.join(output_path, "config.json"), "w") as fh: + json.dump(config, fh, indent=4, sort_keys=True) + print("Done!") + + return config + + +def add_user_information(config): + """Adds username and user's home directory to config. + @params config : + Config dictionary containing metadata to run pipeline + @return config : + Updated config dictionary containing user information (username and home directory) + """ + # Get PATH to user's home directory + # Method is portable across unix-like OS and Windows + home = os.path.expanduser("~") + + # Get username from home directory PATH + username = os.path.split(home)[-1] + + # Update config with home directory and username + config["project"]["userhome"] = home + config["project"]["username"] = username + + return config + + +def add_rawdata_information(sub_args, config, ifiles): + """Adds information about rawdata provided to pipeline. + Determines whether the dataset is paired-end or single-end and finds the set of all + rawdata directories (needed for -B option when running singularity). If a user provides + paired-end data, checks to see if both mates (R1 and R2) are present for each sample. + @param sub_args : + Parsed arguments for run sub-command + @params ifiles list[]: + List containing pipeline input files (renamed symlinks) + @params config : + Config dictionary containing metadata to run pipeline + @return config : + Updated config dictionary containing user information (username and home directory) + """ + # Determine whether dataset is paired-end or single-ends + # Updates config['project']['nends']: 1 = single-end, 2 = paired-end + nends = get_nends(ifiles) # Checks PE data for both mates (R1 and R2) + config["project"]["nends"] = nends + + # Finds the set of rawdata directories to bind + rawdata_paths = get_rawdata_bind_paths(input_files=sub_args.input) + config["project"]["datapath"] = ",".join(rawdata_paths) + + # Add each sample's basename, label and group info + config = add_sample_metadata(input_files=ifiles, config=config) + + return config + + +def get_nends(ifiles): + """Determines whether the dataset is paired-end or single-end. + If paired-end data, checks to see if both mates (R1 and R2) are present for each sample. + If single-end, nends is set to 1. Else if paired-end, nends is set to 2. + @params ifiles list[]: + List containing pipeline input files (renamed symlinks) + @return nends_status : + Integer reflecting nends status: 1 = se, 2 = pe + """ + # Determine if dataset contains paired-end data + paired_end = False + nends_status = 1 + for file in ifiles: + if file.endswith(".R2.fastq.gz"): + paired_end = True + nends_status = 2 + break # dataset is paired-end + + # Check to see if both mates (R1 and R2) are present paired-end data + if paired_end: + nends = {} # keep count of R1 and R2 for each sample + for file in ifiles: + # Split sample name on file extension + sample = re.split("\.R[12]\.fastq\.gz", os.path.basename(file))[0] + if sample not in nends: + nends[sample] = 0 + + nends[sample] += 1 + + # Check if samples contain both read mates + missing_mates = [sample for sample, count in nends.items() if count == 1] + if missing_mates: + # Missing an R1 or R2 for a provided input sample + raise NameError( + """\n\tFatal: Detected pair-end data but user failed to provide + both mates (R1 and R2) for the following samples:\n\t\t{}\n + Please check that the basename for each sample is consistent across mates. + Here is an example of a consistent basename across mates: + consistent_basename.R1.fastq.gz + consistent_basename.R2.fastq.gz + + Please do not run the pipeline with a mixture of single-end and paired-end + samples. This feature is currently not supported within {}, and it is + not recommended either. If this is a priority for your project, please run + paired-end samples and single-end samples separately (in two separate output directories). + If you feel like this functionality should exist, feel free to open an issue on Github. + """.format( + missing_mates, sys.argv[0] + ) + ) + + return nends_status + + +def get_rawdata_bind_paths(input_files): + """ + Gets rawdata bind paths of user provided fastq files. + @params input_files list[]: + List containing user-provided input fastq files + @return bindpaths : + Set of rawdata bind paths + """ + bindpaths = [] + for file in input_files: + # Get directory of input file + rawdata_src_path = os.path.dirname(os.path.abspath(os.path.realpath(file))) + if rawdata_src_path not in bindpaths: + bindpaths.append(rawdata_src_path) + + return bindpaths + + +def add_sample_metadata(input_files, config, group=None): + """Adds sample metadata such as sample basename, label, and group information. + If sample sheet is provided, it will default to using information in that file. + If no sample sheet is provided, it will only add sample basenames and labels. + @params input_files list[]: + List containing pipeline input fastq files + @params config : + Config dictionary containing metadata to run pipeline + @params group : + Sample sheet containing basename, group, and label for each sample + @return config : + Updated config with basenames, labels, and groups (if provided) + """ + # TODO: Add functionality for basecase when user has samplesheet + added = [] + for file in input_files: + # Split sample name on file extension + sample = re.split("\.R[12]\.fastq\.gz", os.path.basename(file))[0] + if sample not in added: + # Only add PE sample information once + added.append(sample) + config["project"]["groups"]["rsamps"].append(sample) + config["project"]["groups"]["rgroups"].append(sample) + config["project"]["groups"]["rlabels"].append(sample) + + return config + + +def join_jsons(templates): + """Joins multiple JSON files to into one data structure + Used to join multiple template JSON files to create a global config dictionary. + @params templates : + List of template JSON files to join together + @return aggregated : + Dictionary containing the contents of all the input JSON files + """ + # Get absolute PATH to templates in renee git repo + repo_path = os.path.dirname(os.path.abspath(__file__)) + aggregated = {} + + for file in templates: + with open(os.path.join(repo_path, file), "r") as fh: + aggregated.update(json.load(fh)) + + return aggregated + + +def get_repo_git_commit_hash(repo_path): + """Gets the git commit hash of the RENEE repo. + @param repo_path : + Path to RENEE git repo + @return githash : + Latest git commit hash + """ + try: + githash = ( + subprocess.check_output( + ["git", "rev-parse", "HEAD"], stderr=subprocess.STDOUT, cwd=repo_path + ) + .strip() + .decode("utf-8") + ) + # Typecast to fix python3 TypeError (Object of type bytes is not JSON serializable) + # subprocess.check_output() returns a byte string + githash = str(githash) + except Exception as e: + # Github releases are missing the .git directory, + # meaning you cannot get a commit hash, set the + # commit hash to indicate its from a GH release + githash = "github_release" + + return githash diff --git a/src/renee/util.py b/src/renee/util.py new file mode 100644 index 0000000..0ffad74 --- /dev/null +++ b/src/renee/util.py @@ -0,0 +1,324 @@ +import datetime +import glob +import os +import subprocess +import shutil +import sys +import warnings +from .cache import get_singularity_cachedir + + +def renee_base(rel_path=""): + """Get the absolute path to a file in the RENEE repository + @return abs_path + """ + basedir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + ) + return os.path.join(basedir, rel_path) + + +def get_version(): + """Get the current RENEE version + @return version + """ + with open(renee_base("VERSION"), "r") as vfile: + version = f"v{vfile.read().strip()}" + return version + + +def scontrol_show(): + """Run scontrol show config and parse the output as a dictionary + @return scontrol_dict : + """ + scontrol_dict = dict() + scontrol_out = subprocess.run( + "scontrol show config", shell=True, capture_output=True, text=True + ).stdout + if len(scontrol_out) > 0: + for line in scontrol_out.split("\n"): + line_split = line.split("=") + if len(line_split) > 1: + scontrol_dict[line_split[0].strip()] = line_split[1].strip() + return scontrol_dict + + +def get_hpcname(): + """Get the HPC name (biowulf, frce, or an empty string) + @return hpcname + """ + scontrol_out = scontrol_show() + hpc = scontrol_out["ClusterName"] if "ClusterName" in scontrol_out.keys() else "" + if hpc == "fnlcr": + hpc = "frce" + return hpc + + +def get_tmp_dir(tmp_dir, outdir, hpc=get_hpcname()): + """Get default temporary directory for biowulf and frce. Allow user override.""" + if not tmp_dir: + if hpc == "biowulf": + tmp_dir = "/lscratch/$SLURM_JOBID" + elif hpc == "frce": + tmp_dir = outdir + else: + tmp_dir = None + return tmp_dir + + +def get_shared_resources_dir(shared_dir, hpc=get_hpcname()): + """Get default shared resources directory for biowulf and frce. Allow user override.""" + if not shared_dir: + if hpc == "biowulf": + shared_dir = ( + "/data/CCBR_Pipeliner/Pipelines/RENEE/resources/shared_resources" + ) + elif hpc == "frce": + shared_dir = "/mnt/projects/CCBR-Pipelines/pipelines/RENEE/resources/shared_resources" + return shared_dir + + +def get_genomes_list(hpcname=get_hpcname(), error_on_warnings=False): + """Get list of genome annotations available for the current platform + @return genomes_list + """ + return sorted( + list( + get_genomes_dict( + hpcname=hpcname, error_on_warnings=error_on_warnings + ).keys() + ) + ) + + +def get_genomes_dict(hpcname=get_hpcname(), error_on_warnings=False): + """Get dictionary of genome annotation versions and the paths to the corresponding JSON files + @return genomes_dict { genome_name: json_file_path } + """ + if error_on_warnings: + warnings.filterwarnings("error") + genomes_dir = renee_base(os.path.join("config", "genomes", hpcname)) + if not os.path.exists(genomes_dir): + warnings.warn(f"Folder does not exist: {genomes_dir}") + search_term = genomes_dir + "/*.json" + json_files = glob.glob(search_term) + if len(json_files) == 0: + warnings.warn( + f"No Genome+Annotation JSONs found in {genomes_dir}. Please specify a custom genome json file with `--genome`" + ) + genomes_dict = { + os.path.basename(json_file).replace(".json", ""): json_file + for json_file in json_files + } + warnings.resetwarnings() + return genomes_dict + + +def check_python_version(): + # version check + # glob.iglob requires 3.11 for using "include_hidden=True" + MIN_PYTHON = (3, 11) + try: + assert sys.version_info >= MIN_PYTHON + print( + "Python version: {0}.{1}.{2}".format( + sys.version_info.major, sys.version_info.minor, sys.version_info.micro + ) + ) + except AssertionError: + exit( + f"{sys.argv[0]} requires Python {'.'.join([str(n) for n in MIN_PYTHON])} or newer" + ) + + +def _cp_r_safe_( + source, target, resources=["workflow", "resources", "config"], safe_mode=True +): + """Private function: Given a list paths it will recursively copy each to the + target location. If a target path already exists, it will not over-write the + existing paths data when `safe_mode` is on. + @param resources : + List of paths to copy over to target location. + Default: ["workflow", "resources", "config"] + @params source : + Add a prefix PATH to each resource + @param target : + Target path to copy templates and required resources (aka destination) + @param safe_mode : + Only copy the resources to the target path + if they do not exist in the target path (default: True) + """ + for resource in resources: + destination = os.path.join(target, resource) + if os.path.exists(destination) and safe_mode: + print(f"🚫 path exists and `safe_mode` is ON, not copying: {destination}") + else: + # Required resources do not exist, or safe mode is off + shutil.copytree( + os.path.join(source, resource), destination, dirs_exist_ok=not safe_mode + ) + + +def orchestrate( + mode, + outdir, + additional_bind_paths, + alt_cache, + threads=2, + submission_script="runner", + masterjob="pl:renee", + tmp_dir=None, + wait="", + hpcname="", +): + """Runs RENEE pipeline via selected executor: local or slurm. + If 'local' is selected, the pipeline is executed locally on a compute node/instance. + If 'slurm' is selected, jobs will be submitted to the cluster using SLURM job scheduler. + Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. + @param outdir : + Pipeline output PATH + @param mode : + Execution method or mode: + local runs serially a compute instance without submitting to the cluster. + slurm will submit jobs to the cluster using the SLURM job scheduler. + @param additional_bind_paths : + Additional paths to bind to container filesystem (i.e. input file paths) + @param alt_cache : + Alternative singularity cache location + @param threads : + Number of threads to use for local execution method + @param submission_script : + Path to master jobs submission script: + renee run = /path/to/output/resources/runner + renee build = /path/to/output/resources/builder + @param masterjob : + Name of the master job + @param tmp_dir : + Absolute Path to temp dir for compute node + @param wait : + "--wait" to wait for master job to finish. This waits when pipeline is called via NIDAP API + @param hpcname : + "biowulf" if run on biowulf, "frce" if run on frce, blank otherwise. hpcname is determined in setup() function + @return masterjob : + """ + # Add additional singularity bind PATHs + # to mount the local filesystem to the + # containers filesystem, NOTE: these + # PATHs must be an absolute PATHs + outdir = os.path.abspath(outdir) + # Add any default PATHs to bind to + # the container's filesystem, like + # tmp directories, /lscratch + addpaths = [] + # set tmp_dir depending on hpc + tmp_dir = get_tmp_dir(tmp_dir, outdir) + temp = os.path.dirname(tmp_dir.rstrip("/")) + if temp == os.sep: + temp = tmp_dir.rstrip("/") + if outdir not in additional_bind_paths.split(","): + addpaths.append(outdir) + if temp not in additional_bind_paths.split(","): + addpaths.append(temp) + bindpaths = ",".join(addpaths) + + # Set ENV variable 'SINGULARITY_CACHEDIR' + # to output directory + my_env = {} + my_env.update(os.environ) + + cache = get_singularity_cachedir(output_dir=outdir, cache_dir=alt_cache) + my_env["SINGULARITY_CACHEDIR"] = cache + + if additional_bind_paths: + # Add Bind PATHs for outdir and tmp dir + if bindpaths: + bindpaths = ",{}".format(bindpaths) + bindpaths = "{}{}".format(additional_bind_paths, bindpaths) + + if not os.path.exists(os.path.join(outdir, "logfiles")): + # Create directory for logfiles + os.makedirs(os.path.join(outdir, "logfiles")) + + if os.path.exists(os.path.join(outdir, "logfiles", "snakemake.log")): + mtime = _get_file_mtime(os.path.join(outdir, "logfiles", "snakemake.log")) + newname = os.path.join(outdir, "logfiles", "snakemake." + str(mtime) + ".log") + os.rename(os.path.join(outdir, "logfiles", "snakemake.log"), newname) + + # Create .singularity directory for installations of snakemake + # without setuid which create a sandbox in the SINGULARITY_CACHEDIR + if not os.path.exists(cache): + # Create directory for sandbox and image layers + os.makedirs(cache) + + # Run on compute node or instance without submitting jobs to a scheduler + if mode == "local": + # Run RENEE: instantiate main/master process + # Look into later: it maybe worth replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # Create log file for pipeline + logfh = open(os.path.join(outdir, "logfiles", "snakemake.log"), "w") + masterjob = subprocess.Popen( + [ + "snakemake", + "-pr", + "--use-singularity", + "--singularity-args", + "'-B {}'".format(bindpaths), + "--cores", + str(threads), + "--configfile=config.json", + ], + cwd=outdir, + env=my_env, + ) + + # Submitting jobs to cluster via SLURM's job scheduler + elif mode == "slurm": + # Run RENEE: instantiate main/master process + # Look into later: it maybe worth replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds + # --cluster-config $R/cluster.json --keep-going --restart-times 3 + # --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}" + # -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T + # 2>&1| tee -a $R/Reports/snakemake.log + + # Create log file for master job information + logfh = open(os.path.join(outdir, "logfiles", "master.log"), "w") + # submission_script for renee run is /path/to/output/resources/runner + # submission_script for renee build is /path/to/output/resources/builder + cmdlist = [ + str(os.path.join(outdir, "resources", str(submission_script))), + mode, + "-j", + str(masterjob), + "-b", + str(bindpaths), + "-o", + str(outdir), + "-c", + str(cache), + "-t", + str(tmp_dir), + ] + if str(wait) == "--wait": + cmdlist.append("-w") + if str(hpcname) != "": + cmdlist.append("-n") + cmdlist.append(hpcname) + else: + cmdlist.append("-n") + cmdlist.append("unknown") + + print(" ".join(cmdlist)) + masterjob = subprocess.Popen( + cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env + ) + + return masterjob + + +def _get_file_mtime(f): + timestamp = datetime.fromtimestamp(os.path.getmtime(os.path.abspath(f))) + mtime = timestamp.strftime("%y%m%d%H%M%S") + return mtime diff --git a/tests/test_build.py b/tests/test_build.py index 4bfeaf5..9075769 100644 --- a/tests/test_build.py +++ b/tests/test_build.py @@ -4,45 +4,13 @@ import pathlib import tempfile -from renee.src.renee.__main__ import _cp_r_safe_ +from renee.src.renee.__main__ import build renee_build = ( - "src/renee/__main__.py build " + "./bin/renee build " "--dry-run " "--ref-name test " "--ref-fa .tests/KO_S3.R1.fastq.gz " "--ref-gtf .tests/KO_S3.R1.fastq.gz " "--gtf-ver 0 " ) -RENEE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - - -def test_cp_safe(): - with tempfile.TemporaryDirectory() as tmp_dir: - outdir = os.path.join(tmp_dir, "testout") - os.makedirs(os.path.join(outdir, "config")) - pathlib.Path(os.path.join(outdir, "config", "tmp.txt")).touch() - with contextlib.redirect_stdout(io.StringIO()) as stdout: - _cp_r_safe_( - source=RENEE_PATH, - target=outdir, - resources=["config"], - safe_mode=True, - ) - assert "path exists and `safe_mode` is ON, not copying" in stdout.getvalue() - - -def test_cp_unsafe(): - with tempfile.TemporaryDirectory() as tmp_dir: - outdir = os.path.join(tmp_dir, "testout") - configdir = os.path.join(outdir, "config") - os.makedirs(configdir) - pathlib.Path(os.path.join(configdir, "tmp.txt")).touch() - with contextlib.redirect_stdout(io.StringIO()) as stdout: - _cp_r_safe_( - source=RENEE_PATH, - target=outdir, - resources=["config"], - safe_mode=False, - ) - assert not stdout.getvalue() and "config.yaml" in os.listdir(configdir) diff --git a/tests/test_cache.py b/tests/test_cache.py index 384c9c4..846929f 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -3,8 +3,10 @@ import os.path import subprocess +from renee.src.renee.cache import get_sif_cache_dir, get_singularity_cachedir + renee_run = ( - "src/renee/__main__.py run " + "./bin/renee run " "--mode local --runmode init --dry-run " "--input .tests/*.fastq.gz " "--genome config/genomes/biowulf/hg38_30.json " @@ -42,7 +44,24 @@ def test_cache_sif(): def test_cache_nosif(): output, config = run_in_temp(f"{renee_run}") assertions = [ - config["images"]["arriba"] == "docker://nciccbr/ccbr_arriba_2.0.0:v0.0.1", - "The singularity command has to be available" in output.stderr, + config["images"]["arriba"] == "docker://nciccbr/ccbr_arriba_2.0.0:v0.0.1" ] assert all(assertions) + + +def test_get_sif_cache_dir(): + assertions = [ + "'CCBR_Pipeliner/SIFS' in get_sif_cache_dir('biowulf')", + "'CCBR-Pipelines/SIFs' in get_sif_cache_dir('frce')", + ] + errors = [assertion for assertion in assertions if not eval(assertion)] + assert not errors, "errors occurred:\n{}".format("\n".join(errors)) + + +def test_get_singularity_cachedir(): + assertions = [ + "get_singularity_cachedir('outdir') == 'outdir/.singularity'", + "get_singularity_cachedir('outdir', 'cache') == 'cache'", + ] + errors = [assertion for assertion in assertions if not eval(assertion)] + assert not errors, "errors occurred:\n{}".format("\n".join(errors)) diff --git a/tests/test_cli.py b/tests/test_cli.py index 1c8dd47..c9b6a7a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,7 +3,7 @@ from renee.src.renee.__main__ import main renee_run = ( - "src/renee/__main__.py run " + "./bin/renee run " "--mode local --runmode init --dry-run " "--input .tests/*.fastq.gz " "--genome config/genomes/biowulf/hg38_30.json " @@ -12,14 +12,14 @@ def test_help(): output = subprocess.run( - "src/renee/__main__.py --help", capture_output=True, shell=True, text=True + "./bin/renee --help", capture_output=True, shell=True, text=True ).stdout assert "RENEE" in output def test_version(): output = subprocess.run( - "src/renee/__main__.py --version", capture_output=True, shell=True, text=True + "./bin/renee --version", capture_output=True, shell=True, text=True ).stdout assert "renee v" in output @@ -38,7 +38,7 @@ def test_subcommands_help(): [ f"renee {cmd } [--help]" in subprocess.run( - f"src/renee/__main__.py {cmd} --help", + f"./bin/renee {cmd} --help", capture_output=True, shell=True, text=True, diff --git a/tests/test_run.py b/tests/test_run.py new file mode 100644 index 0000000..229d40a --- /dev/null +++ b/tests/test_run.py @@ -0,0 +1,43 @@ +import argparse +import glob +import os +import tempfile + +from renee.src.renee.util import ( + get_tmp_dir, + get_shared_resources_dir, + renee_base, +) +from renee.src.renee.cache import get_sif_cache_dir +from renee.src.renee.run import run, run_in_context +from renee.src.renee.util import get_hpcname + + +def test_dryrun(): + if get_hpcname() == "biowulf": + with tempfile.TemporaryDirectory() as tmp_dir: + run_args = argparse.Namespace( + input=list(glob.glob(os.path.join(renee_base(".tests"), "*.fastq.gz"))), + output=tmp_dir, + genome=os.path.join( + renee_base("config"), "genomes", "biowulf", "hg38_36.json" + ), + mode="slurm", + runmode="run", + dry_run=True, + sif_cache=get_sif_cache_dir(), + singularity_cache=os.environ["SINGULARITY_CACHEDIR"], + tmp_dir=tmp_dir, + shared_resources=get_shared_resources_dir(None), + star_2_pass_basic=False, + small_rna=False, + create_nidap_folder=False, + wait=False, + threads=2, + ) + # execute dry run and capture stdout/stderr + allout = run_in_context(run_args) + assert ( + "This was a dry-run (flag -n). The order of jobs does not reflect the order of execution." + in allout + ) diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000..d344e21 --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,77 @@ +import contextlib +import io +import os +import pathlib +import pytest +import tempfile +import warnings + +from renee.src.renee.util import ( + renee_base, + _cp_r_safe_, + get_genomes_dict, + get_genomes_list, +) + + +def test_renee_base(): + renee_bin = renee_base(os.path.join("bin", "renee")) + assert renee_bin.endswith("/bin/renee") and os.path.exists(renee_bin) + + +def test_cp_safe(): + with tempfile.TemporaryDirectory() as tmp_dir: + outdir = os.path.join(tmp_dir, "testout") + os.makedirs(os.path.join(outdir, "config")) + pathlib.Path(os.path.join(outdir, "config", "tmp.txt")).touch() + with contextlib.redirect_stdout(io.StringIO()) as stdout: + _cp_r_safe_( + source=renee_base(), + target=outdir, + resources=["config"], + safe_mode=True, + ) + assert "path exists and `safe_mode` is ON, not copying" in stdout.getvalue() + + +def test_cp_unsafe(): + with tempfile.TemporaryDirectory() as tmp_dir: + outdir = os.path.join(tmp_dir, "testout") + configdir = os.path.join(outdir, "config") + os.makedirs(configdir) + pathlib.Path(os.path.join(configdir, "tmp.txt")).touch() + with contextlib.redirect_stdout(io.StringIO()) as stdout: + _cp_r_safe_( + source=renee_base(), + target=outdir, + resources=["config"], + safe_mode=False, + ) + assert not stdout.getvalue() and "config.yaml" in os.listdir(configdir) + + +def test_get_genomes_warnings(): + with warnings.catch_warnings(record=True) as raised_warnings: + genomes = get_genomes_list(hpcname="notAnOption") + assertions = [ + "len(genomes) == 0", + "len(raised_warnings) == 2", + "raised_warnings[0].category is UserWarning", + "raised_warnings[1].category is UserWarning", + '"Folder does not exist" in str(raised_warnings[0].message)', + '"No Genome+Annotation JSONs found" in str(raised_warnings[1].message)', + ] + scope = locals() # make local variables available to eval() + errors = [assertion for assertion in assertions if not eval(assertion, scope)] + assert not errors, "errors occurred:\n{}".format("\n".join(errors)) + + +def test_get_genomes_error(): + with pytest.raises(UserWarning) as exception_info: + get_genomes_list(hpcname="notAnOption", error_on_warnings=True) + assert "Folder does not exist" in str(exception_info.value) + + +def test_get_genomes_biowulf(): + genomes_dict = get_genomes_dict(hpcname="biowulf") + assert len(genomes_dict) > 10