diff --git a/config/cluster.json b/config/cluster.json index 02ef370..45bce7a 100644 --- a/config/cluster.json +++ b/config/cluster.json @@ -1,32 +1,60 @@ { "__default__": { "threads": 4, - "mem": "8g", + "mem": "8G", "partition": "norm", - "time": "0-04:00:00" + "time": "0-04:00:00", + "gres": "lscratch:64" + }, + "concat_reads": { + "threads": 28, + "mem": "16G", + "partition": "norm", + "time": "1-00:00:00", + "gres": "lscratch:400" + }, + "concat_rna_reads": { + "threads": 28, + "mem": "16G", + "partition": "norm", + "time": "1-00:00:00", + "gres": "lscratch:400" }, "metawrap_read_qc": { - "threads": 8, - "mem": "16g", + "threads": 16, + "mem": "32G", "partition": "norm", - "time": "0-04:00:00", - "gres": "None" + "time": "1-00:00:00", + "gres": "lscratch:400" + }, + "rna_read_qc": { + "threads": 16, + "mem": "32G", + "partition": "norm", + "time": "1-00:00:00", + "gres": "lscratch:400" }, "metawrap_genome_assembly": { - "threads": 24, - "mem": "128g", + "threads": 48, + "mem": "128G", "partition": "norm", - "time": "2-00:00:00" + "time": "5-00:00:00" }, "metawrap_tax_classification": { - "threads": 12, - "mem": "32g", - "partition": "quick", - "time": "0-04:00:00" - }, - "metawrap_assembly_binning": { - "threads": 16, - "mem": "64g", + "threads": 32, + "mem": "64G", + "partition": "norm", + "time": "5-00:00:00" + }, + "metawrap_binning": { + "threads": 32, + "mem": "64G", + "partition": "norm", + "time": "2-00:00:00" + }, + "derep_bins": { + "threads": 32, + "mem": "32G", "partition": "norm", "time": "2-00:00:00" } diff --git a/config/images.json b/config/images.json index 8fdfa95..428943f 100644 --- a/config/images.json +++ b/config/images.json @@ -1,10 +1,10 @@ { "images": { - "metawrap": "docker://rroutsong/metamorph_metawrap:0.0.2", + "metawrap": "docker://rroutsong/metamorph_metawrap:0.0.4", "metagenome": "docker://rroutsong/metamorph_metagenome:0.0.1" }, "containers": { - "metawrap": "/data/OpenOmics/SIFs/metamorph_metawrap_1.3.2.sif", + "metawrap": "/data/OpenOmics/SIFs/metamorph_metawrap_0.0.4.sif", "metagenome": "/data/OpenOmics/SIFs/metamorph_metagenome_0.0.1.sif" } } diff --git a/config/resources.json b/config/resources.json index c8932e4..284ec44 100644 --- a/config/resources.json +++ b/config/resources.json @@ -1,36 +1,46 @@ { - "binds": [ + "databases": [ { - "name": "KRAKEN_DB2", - "to": "$HOME/KRAKEN_DB2", - "from": "/data/OpenOmics/references/metamorph/kraken2/k2_pluspfp_08gb_20230605", - "mode": "ro" + "name": "KRAKEN2_DB", + "to": "/data2/KRAKEN_DB2", + "from": "/data/OpenOmics/references/metamorph/kraken2/k2_pluspfp_08gb_20240112", + "mode": "rw" }, { "name": "KRAKEN_DB", - "to": "$HOME/KRAKEN_DB", + "to": "/data2/KRAKEN_DB", "from": "/data/OpenOmics/references/metamorph/kraken/20171019_minikraken_8GB", "mode": "ro" }, { "name": "BMTAGGER_INDEX", - "to": "$HOME/BMTAGGER_DB", + "to": "/data2/BMTAGGER_DB", "from": "/data/OpenOmics/references/metamorph/BMTAGGER/BMTAGGER_INDEX", "mode": "ro" }, { "name": "BMTAGGER_INDEX", - "to": "$HOME/GTDBTK_DB", + "to": "/data2/GTDBTK_DB", "from": "/data/OpenOmics/references/metamorph/GTDBtk/release207_v2", "mode": "ro" }, { "name": "GUNC_DB", - "to": "$HOME/GUNC_DB", + "to": "/data2/GUNC_DB", "from": "/data/OpenOmics/references/metamorph/GUNC/gunc_1.0.5db", "mode": "ro" + }, { + "name": "NCBI_NT", + "to": "/data2/NCBI_NT_DB", + "from": "/data/OpenOmics/references/metamorph/NCBI_nt", + "mode": "rw" + }, { + "name": "NCBI_TAX", + "to": "/data2/NCBI_TAX_DB", + "from": "/data/OpenOmics/references/metamorph/taxonomy", + "mode": "rw" }, { "name": "CHECKM_DB", - "to": "$HOME/checkm", + "to": "/data2/CHECKM_DB", "from": "/data/OpenOmics/references/metamorph/checkm", "mode": "rw" - }, { + }, { "name": "CHECKM_CONFIG", "to": "/opt/conda/envs/metawrap-env/lib/python2.7/site-packages/checkm/DATA_CONFIG", "from": "/data/OpenOmics/references/metamorph/checkm/DATA_CONFIG", diff --git a/docker/Dockerfile b/docker/Dockerfile deleted file mode 100644 index 7f40036..0000000 --- a/docker/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -FROM mambaorg/micromamba - -# install sudo, update system pkgs -USER root -RUN apt-get update; apt-get upgrade; apt-get install -y sudo vim debianutils bash -RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers -RUN echo 'routsongrm ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers - -# $MAMBA_USER from inherited image -RUN usermod -a -G sudo $MAMBA_USER -RUN passwd -d $MAMBA_USER - -# back to mamba user -USER $MAMBA_USER - -# setup up metagenome environment -COPY assets /assets diff --git a/docker/metawrap/Dockerfile b/docker/metawrap/Dockerfile new file mode 100644 index 0000000..620e1c1 --- /dev/null +++ b/docker/metawrap/Dockerfile @@ -0,0 +1,23 @@ +FROM condaforge/miniforge3:latest +RUN apt-get update; apt-get install -y -qq curl build-essential vim dos2unix bash python2.7 +RUN mamba create -y -n metawrap-env +RUN conda config --add channels defaults; conda config --add channels conda-forge; \ + conda config --add channels bioconda;conda config --add channels ursky +RUN mamba install -y --only-deps -c ursky -n metawrap-env metawrap-mg==1.3.2 +RUN cd /home; git clone https://github.com/rroutsong/metaWRAP.git; chmod -R 777 metaWRAP +RUN mamba run -n metawrap-env pip3 install drep +ENV PATH="/home/metaWRAP/bin:/opt/conda/envs/metawrap-env/bin:$PATH" +COPY docker/metawrap/config-metawrap /home/metaWRAP/bin/config-metawrap +COPY docker/metawrap/Dockerfile /Dockerfile +RUN mkdir /install; cd /install; wget https://carlowood.github.io/which/which-2.21.tar.gz; tar xvf which-2.21.tar.gz +RUN cd /install/which-2.21; ./configure; make && make install +RUN rm /usr/bin/which; ln -s /usr/local/bin/which /usr/bin/which +RUN md5sum Dockerfile > /Dockerfile.md5 +ADD docker/metawrap/mw /home/metaWRAP/bin/mw +RUN dos2unix /home/metaWRAP/bin/config-metawrap +RUN dos2unix /home/metaWRAP/bin/mw +RUN chmod a+rx /home/metaWRAP/bin/config-metawrap /home/metaWRAP/bin/mw +ENV BASH_ENV="/etc/bash.bashrc" +RUN echo ". /opt/conda/etc/profile.d/conda.sh && conda activate metawrap-env" >> /etc/bash.bashrc +RUN echo 'export PATH="/home/metaWRAP/bin:/opt/conda/envs/metawrap-env/bin:$PATH"' >> /etc/bash.bashrc +ENTRYPOINT ["/bin/bash"] diff --git a/docker/metawrap/config-metawrap_1.3.2 b/docker/metawrap/config-metawrap similarity index 75% rename from docker/metawrap/config-metawrap_1.3.2 rename to docker/metawrap/config-metawrap index 8d2e8bd..62bdc74 100755 --- a/docker/metawrap/config-metawrap_1.3.2 +++ b/docker/metawrap/config-metawrap @@ -1,19 +1,14 @@ -#!/bin/bash # Paths to custon pipelines and scripts of metaWRAP mw_path=$(which metawrap) bin_path=${mw_path%/*} SOFT=${bin_path}/metawrap-scripts PIPES=${bin_path}/metawrap-modules - # OPTIONAL databases (see 'Databases' section of metaWRAP README for details) # path to kraken standard databases -KRAKEN_DB=~/KRAKEN_DB -KRAKEN2_DB=~/KRAKEN_DB2 - +KRAKEN_DB=/data2/KRAKEN_DB +KRAKEN2_DB=/data2/KRAKEN_DB2 # path to indexed human (or other host) genome (see metaWRAP website for guide). This includes .bitmask and .srprism files -BMTAGGER_DB=~/BMTAGGER_DB - +BMTAGGER_DB=/data2/BMTAGGER_DB # paths to BLAST databases -BLASTDB=~/NCBI_NT_DB -TAXDUMP=~/NCBI_TAX_DB - +BLASTDB=/data2/NCBI_NT_DB +TAXDUMP=/data2/NCBI_TAX_DB \ No newline at end of file diff --git a/docker/metawrap/docker_1.3.0 b/docker/metawrap/docker_1.3.0 deleted file mode 100644 index 99f9d3d..0000000 --- a/docker/metawrap/docker_1.3.0 +++ /dev/null @@ -1 +0,0 @@ -FROM quay.io/biocontainers/metawrap-mg:1.3.0--hdfd78af_1 \ No newline at end of file diff --git a/docker/metawrap/docker_1.3.2 b/docker/metawrap/docker_1.3.2 deleted file mode 100644 index 02620a5..0000000 --- a/docker/metawrap/docker_1.3.2 +++ /dev/null @@ -1,18 +0,0 @@ -FROM condaforge/miniforge3:latest -RUN mamba create -y -n metawrap-env -RUN conda config --add channels defaults; conda config --add channels conda-forge; \ - conda config --add channels bioconda;conda config --add channels ursky -RUN echo "export PATH=\"$PATH:/home/metaWRAP/bin\"" >> /etc/bash.bashrc -# RUN mamba install -y -n metawrap-env biopython blas=2.5 blast=2.6.0 bmtagger bowtie2 bwa checkm-genome \ -# fastqc kraken=1.1 kraken=2.0 krona=2.7 matplotlib maxbin2 megahit metabat2 pandas \ -# prokka quast r-ggplot2 r-recommended salmon samtools=1.9 seaborn spades trim-galore -RUN mamba install -y --only-deps -c ursky -n metawrap-env metawrap-mg==1.3.2 -RUN cd /home; git clone https://github.com/bxlab/metaWRAP.git; chmod -R 777 metaWRAP -RUN sed '$d' /etc/skel/.bashrc; sed '$d' ~/.bashrc -RUN echo ". ${CONDA_DIR}/etc/profile.d/conda.sh && conda activate metawrap-env" >> /etc/skel/.bashrc && \ - echo ". ${CONDA_DIR}/etc/profile.d/conda.sh && conda activate metawrap-env" >> ~/.bashrc -ENV PATH="/home/metawrap/bin:$PATH" -COPY docker/metawrap/config-metawrap_1.3.2 /home/metaWRAP/bin/config-metawrap -RUN chmod u+x /home/metaWRAP/bin/config-metawrap -RUN mamba run -n metawrap-env pip3 install drep -ENTRYPOINT ["tini", "--", "/bin/bash", "--rcfile", "/etc/skel/.bashrc"] diff --git a/docker/metawrap/mw b/docker/metawrap/mw new file mode 100755 index 0000000..281888d --- /dev/null +++ b/docker/metawrap/mw @@ -0,0 +1,5 @@ +#!/bin/bash +set +eu +source /etc/bash.bashrc +/home/metaWRAP/bin/metawrap "$@" +set -eu \ No newline at end of file diff --git a/docs/requirements.txt b/docs/requirements.txt index dbe98a1..6c6ace2 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -31,4 +31,5 @@ tornado==6.0.4 tqdm==4.48.2 zipp==3.1.0 mkdocs-git-revision-date-plugin +mkdocs-snakemake-rule-plugin mike diff --git a/metamorph b/metamorph index 3e5eb15..17a1f55 100755 --- a/metamorph +++ b/metamorph @@ -27,25 +27,34 @@ in any work or product based on this material. USAGE: $ metamorph [OPTIONS] -EXAMPLE: - co-assembly: - $ metamorph run --coa --input *.R?.fastq.gz --output output/ - $ metamorph run -C --input *.R?.fastq.gz --output output/ - per-sample assembly: - $ metamorph run --input *.R?.fastq.gz --output output/ +EXAMPLES: + co-assembly dna-only: + $ metamorph run --coa --input *.R?.fastq.gz --output output + $ metamorph run -C --input *.R?.fastq.gz --output output + + per-sample assembly dna-only: + $ metamorph run --input *.R?.fastq.gz --output output + + co-assembly rna & dna: + $ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + $ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + + per-sample assembly rna & dna: + $ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output """ # Python standard library from __future__ import print_function import sys, os, subprocess, re, json, textwrap +from datetime import timezone, datetime # 3rd party imports from pypi -import argparse # potential python3 3rd party package, added in python/3.5 +import argparse # Local imports from src import version -from src.run import init, setup, bind, dryrun, runner, run_coa_pipeline +from src.run import init, setup, bind, dryrun, runner from src.utils import ( Colors, err, @@ -62,6 +71,7 @@ __version__ = version __authors__ = 'Neelam Redekar, Skyler Kuhn' __email__ = 'neelam.redekar nih.gov, skyler.kuhn nih.gov' __home__ = os.path.dirname(os.path.abspath(__file__)) +_datetime = int(datetime.now(tz=timezone.utc).timestamp() * 1000) _name = os.path.basename(sys.argv[0]) _description = 'An awesome metagenomics and metatranscriptomics pipeline' @@ -79,7 +89,10 @@ def unlock(sub_args): try: unlock_output = subprocess.check_output([ - 'snakemake', '--unlock', + 'snakemake', + '--unlock', + '--force', + '-s', os.path.abspath(f'{outdir}/workflow/Snakefile'), '--cores', '1', '--configfile=config.json' ], cwd = outdir, @@ -108,10 +121,16 @@ def run(sub_args): # copy over required resources to run # the pipeline git_repo = __home__ + + fastq_inputs = [sub_args.input] + + if sub_args.rna: + fastq_inputs.append(sub_args.rna) + input_files = init( repo_path = git_repo, output_path = sub_args.output, - links = sub_args.input + links = fastq_inputs ) # Step 2. Setup pipeline for execution, @@ -132,7 +151,8 @@ def run(sub_args): ) config['bindpaths'] = bindpaths - config['coassembly'] = sub_args.coa + # config['coassembly'] = sub_args.coa + config['coassembly'] = False # Step 4. Save config to output directory with open(os.path.join(sub_args.output, 'config.json'), 'w') as fh: @@ -159,13 +179,8 @@ def run(sub_args): log = os.path.join(sub_args.output, 'logfiles', 'master.log') logfh = open(log, 'w') - if sub_args.coa: - cjob = run_coa_pipeline(sub_args.mode, - sub_args.output, - sub_args.singularity_cache, - logfh, - sub_args.tmp_dir, - ",".join(bindpaths)) + if 'databases' in config: + bindpaths.extend([mount['from']+':'+mount['to']+':'+mount['mode'] for mount in config['databases']]) mjob = runner(mode = sub_args.mode, outdir = sub_args.output, @@ -176,7 +191,7 @@ def run(sub_args): submission_script=os.path.join(__home__, 'src', 'run.sh'), logger = logfh, additional_bind_paths = ",".join(bindpaths), - tmp_dir = sub_args.tmp_dir, + tmp_dir = sub_args.tmp_dir, ) # Step 6. Wait for subprocess to complete, @@ -283,9 +298,7 @@ def parsed_arguments(name, description): provided working directory has not been initialized, it will be created automatically. Example: --output /data/$USER/output - - {3}{4}Analysis options:{5} - ...coming soon! + {3}{4}Orchestration options:{5} --mode {{slurm,local}} @@ -363,35 +376,50 @@ def parsed_arguments(name, description): # Display example usage in epilog run_epilog = textwrap.dedent("""\ - {2}{3}Example:{4} - # Step 1.) Grab an interactive node, - # do not run on head node! - srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash - module purge - module load singularity snakemake - - # Step 2A.) Dry-run the pipeline - ./{0} run --input .tests/*.R?.fastq.gz \\ - --output /data/$USER/output \\ - --mode slurm \\ - --dry-run - - # Step 3A.) Run the {0} pipeline in per-sample fashion - # The slurm mode will submit jobs to - # the cluster. It is recommended running - # the pipeline in this mode. - ./{0} run --input .tests/*.R?.fastq.gz \\ - --output /data/$USER/output \\ - --mode slurm - - # Step 3B.) Run the {0} pipeline in co-assembly fashion - # with slurm - ./{0} run --coa --input .tests/*.R?.fastq.gz \\ - --output /data/$USER/output \\ - --mode slurm + {2}{3}INPUT MODES:{4} + # Step 1.) Grab an interactive node, + # do not run on head node! + srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash + module purge + module load singularity snakemake + + # Step 2A.) Dry-run the pipeline + ./{0} run --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm \\ + --dry-run + + # Step 3A.) Run the {0} pipeline in per-sample fashion + # The slurm mode will submit jobs to + # the cluster. It is recommended running + # the pipeline in this mode. + ./{0} run --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm + + # Step 3B.) Run the {0} pipeline in co-assembly fashion + # with slurm + ./{0} run --coa --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm + + {2}{3}EXAMPLES:{4} + co-assembly dna-only: + $ metamorph run --coa --input *.R?.fastq.gz --output output + $ metamorph run -C --input *.R?.fastq.gz --output output + + per-sample assembly dna-only: + $ metamorph run --input *.R?.fastq.gz --output output + + co-assembly rna & dna: + $ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + $ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + + per-sample assembly rna & dna: + $ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output - {2}{3}Version:{4} + {2}{3}VERSION:{4} {1} """.format(name, __version__, c.bold, c.url, c.end)) @@ -416,6 +444,16 @@ def parsed_arguments(name, description): help = argparse.SUPPRESS ) + subparser_run.add_argument( + '--rna', + # Check if the file exists and if it is readable + type = lambda file: permissions(parser, file, os.R_OK), + required = False, + nargs = '+', + help = argparse.SUPPRESS + ) + + # Output Directory, i.e # working directory subparser_run.add_argument( @@ -450,12 +488,19 @@ def parsed_arguments(name, description): ) # a supported job scheduler, etc. - subparser_run.add_argument( - '-C', '--coa', - action="store_true", - required = False, - help = argparse.SUPPRESS - ) + # subparser_run.add_argument( + # '-C', '--coa', + # action="store_true", + # required = False, + # help = argparse.SUPPRESS + # ) + + # subparser_run.add_argument( + # '-R', '--rnacoa', + # action="store_true", + # required = False, + # help = argparse.SUPPRESS + # ) # Name of master job subparser_run.add_argument( @@ -510,7 +555,7 @@ def parsed_arguments(name, description): '--tmp-dir', type = str, required = False, - default = '/lscratch/$SLURM_JOBID/', + default = '/lscratch/$SLURM_JOB_ID/', help = argparse.SUPPRESS ) diff --git a/src/run.py b/src/run.py index 55050a9..96b7391 100644 --- a/src/run.py +++ b/src/run.py @@ -4,8 +4,8 @@ # Python standard library from __future__ import print_function from shutil import copytree -from uuid import uuid4 from datetime import datetime +from pathlib import Path import os, re, json, sys, subprocess # Local imports @@ -18,6 +18,10 @@ from . import version as __version__ +FASTQ_INPUT_EXT = ".fastq.gz" +FASTQ_R1_POSTFIX = f"_R1{FASTQ_INPUT_EXT}" +FASTQ_R2_POSTFIX = f"_R2{FASTQ_INPUT_EXT}" + def init(repo_path, output_path, links=[], required=['workflow', 'resources', 'config']): """Initialize the output directory. If user provides a output @@ -52,7 +56,17 @@ def init(repo_path, output_path, links=[], required=['workflow', 'resources', 'c # Create renamed symlinks for each rawdata # file provided as input to the pipeline - inputs = sym_safe(input_data = links, target = output_path) + try: + os.mkdir(os.path.join(output_path, 'dna')) + except FileExistsError: + pass + inputs = dict(dna=sym_safe(input_data = links[0], target = os.path.join(output_path, 'dna'))) + if len(links) == 2 and links[1]: + try: + os.mkdir(os.path.join(output_path, 'rna')) + except FileExistsError: + pass + inputs['rna'] = sym_safe(input_data = links[1], target = os.path.join(output_path, 'rna')) return inputs @@ -115,19 +129,19 @@ def rename(filename): # key = regex to match string and value = how it will be renamed extensions = { # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. - ".R1.f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.f(ast)?q.gz$": ".R2.fastq.gz", + ".R1.f(ast)?q.gz$": FASTQ_R1_POSTFIX, + ".R2.f(ast)?q.gz$": FASTQ_R2_POSTFIX, # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. # Capture lane information as named group - ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", + ".R1.(?P...).f(ast)?q.gz$": FASTQ_R1_POSTFIX, + ".R2.(?P...).f(ast)?q.gz$": FASTQ_R2_POSTFIX, # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. - "_1.f(ast)?q.gz$": ".R1.fastq.gz", - "_2.f(ast)?q.gz$": ".R2.fastq.gz" + "_1.f(ast)?q.gz$": FASTQ_R1_POSTFIX, + "_2.f(ast)?q.gz$": FASTQ_R2_POSTFIX } - if (filename.endswith('.R1.fastq.gz') or - filename.endswith('.R2.fastq.gz')): + if (filename.endswith(FASTQ_R1_POSTFIX) or + filename.endswith(FASTQ_R2_POSTFIX)): # Filename is already in the correct format return filename @@ -215,27 +229,6 @@ def setup(sub_args, ifiles, repo_path, output_path): return config -def unpacked(nested_dict): - """Generator to recursively retrieves all values in a nested dictionary. - @param nested_dict dict[]: - Nested dictionary to unpack - @yields value in dictionary - """ - # Iterate over all values of - # given dictionary - for value in nested_dict.values(): - # Check if value is of dict type - if isinstance(value, dict): - # If value is dict then iterate - # over all its values recursively - for v in unpacked(value): - yield v - else: - # If value is not dict type - # then yield the value - yield value - - def get_fastq_screen_paths(fastq_screen_confs, match = 'DATABASE', file_index = -1): """Parses fastq_screen.conf files to get the paths of each fastq_screen database. This path contains bowtie2 indices for reference genome to screen against. @@ -322,19 +315,31 @@ def bind(sub_args, config): List of singularity/docker bind paths """ bindpaths = [] - for value in unpacked(config): - if not isinstance(value, str): - continue - if exists(value): - if os.path.isfile(value): - value = os.path.dirname(value) - if value not in bindpaths: - bindpaths.append(value) + resolve = lambda x: str(Path(x).resolve()) + + if 'databases' in config: + dbs = config.pop('databases') + bindpaths.extend([resolve(mount['from'])+':'+resolve(mount['to'])+':'+mount['mode'] for mount in dbs]) + + if 'options' in config and 'input' in config['options']: + inrents = list(set([os.path.abspath(os.path.dirname(p)) for p in config['options']['input'] if os.path.exists(os.path.dirname(p)) and os.path.isdir(os.path.dirname(p))])) + bindpaths.extend(inrents) + + if 'options' in config and 'rna' in config['options']: + rnarents = list(set([os.path.abspath(os.path.dirname(p)) for p in config['options']['rna'] if os.path.exists(os.path.dirname(p)) and os.path.isdir(os.path.dirname(p))])) + bindpaths.extend(rnarents) + + if 'options' in config and 'output' in config['options']: + if os.path.exists(config['options']['output']) and os.path.isdir(config['options']['output']): + bindpaths.append(os.path.abspath(config['options']['output'])) + + if 'tmp_dir' in config: + bindpaths.append(config['tmp_dir']) rawdata_bind_paths = [os.path.abspath(p) for p in config['project']['datapath'].split(',')] working_directory = os.path.realpath(config['project']['workpath']) - return bindpaths + return list(set(bindpaths)) def mixed_inputs(ifiles): @@ -348,7 +353,7 @@ def mixed_inputs(ifiles): fastqs = False bams = False for file in ifiles: - if file.endswith('.R1.fastq.gz') or file.endswith('.R2.fastq.gz'): + if file.endswith(FASTQ_R1_POSTFIX) or file.endswith(FASTQ_R2_POSTFIX): fastqs = True fq_files.append(file) elif file.endswith('.bam'): @@ -393,13 +398,18 @@ def add_user_information(config): # username config['project']['userhome'] = home config['project']['username'] = username - dt = datetime.now().strftime("%m/%d/%Y") - config['project']['id'] = f"{uuid4()}_{dt}_metagenome" + + # dt = datetime.now().strftime("%m_%d_%Y") + # config['project']['id'] = f"{dt}_metagenome_results" + + # TODO: figure up way to uniquely ID results, engineering out + # the problem of misidentifing results files + config['project']['id'] = "metagenome_results" return config -def add_sample_metadata(input_files, config, group=None): +def add_sample_metadata(input_files, config, group_key='samples'): """Adds sample metadata such as sample basename, label, and group information. If sample sheet is provided, it will default to using information in that file. If no sample sheet is provided, it will only add sample basenames and labels. @@ -412,20 +422,15 @@ def add_sample_metadata(input_files, config, group=None): @return config : Updated config with basenames, labels, and groups (if provided) """ - import re - - # TODO: Add functionality for basecase - # when user has samplesheet added = [] - config['samples'] = [] + config[group_key] = [] for file in input_files: # Split sample name on file extension - sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0] + sample = re.split('[\S]R[12]', os.path.basename(file))[0] if sample not in added: # Only add PE sample information once added.append(sample) - config['samples'].append(sample) - + config[group_key].append(sample) return config @@ -448,17 +453,23 @@ def add_rawdata_information(sub_args, config, ifiles): # or single-end # Updates config['project']['nends'] where # 1 = single-end, 2 = paired-end, -1 = bams + convert = {1: 'single-end', 2: 'paired-end', -1: 'bam'} - nends = get_nends(ifiles) # Checks PE data for both mates (R1 and R2) + + nends = get_nends(ifiles['dna']) # Checks PE data for both mates (R1 and R2) config['project']['nends'] = nends config['project']['filetype'] = convert[nends] # Finds the set of rawdata directories to bind - rawdata_paths = get_rawdata_bind_paths(input_files = sub_args.input) - config['project']['datapath'] = ','.join(rawdata_paths) + config['project']['datapath'] = ','.join(get_rawdata_bind_paths(input_files = sub_args.input)) + if sub_args.rna: + config["project"]["rna_datapath"] = ','.join(get_rawdata_bind_paths(input_files = sub_args.rna)) # Add each sample's basename - config = add_sample_metadata(input_files = ifiles, config = config) + config = add_sample_metadata(ifiles['dna'], config) + + if 'rna' in ifiles: + config = add_sample_metadata(ifiles['rna'], config, group_key='rna') return config @@ -518,7 +529,7 @@ def get_nends(ifiles): bam_files = True nends_status = -1 break - elif file.endswith('.R2.fastq.gz'): + elif file.endswith(FASTQ_R2_POSTFIX): paired_end = True nends_status = 2 break # dataset is paired-end @@ -529,7 +540,7 @@ def get_nends(ifiles): nends = {} # keep count of R1 and R2 for each sample for file in ifiles: # Split sample name on file extension - sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0] + sample = re.split('\_R[12]\.fastq\.gz', os.path.basename(file))[0] if sample not in nends: nends[sample] = 0 @@ -543,8 +554,8 @@ def get_nends(ifiles): both mates (R1 and R2) for the following samples:\n\t\t{}\n Please check that the basename for each sample is consistent across mates. Here is an example of a consistent basename across mates: - consistent_basename.R1.fastq.gz - consistent_basename.R2.fastq.gz + consistent_basename_R1.fastq.gz + consistent_basename_R2.fastq.gz Please do not run the pipeline with a mixture of single-end and paired-end samples. This feature is currently not supported within {}, and it is @@ -622,130 +633,6 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna return dryrun_output -def run_coa_pipeline(mode, outdir, alt_cache, logger, tmp_dir, additional_bind_paths): - # gzip compression speed: ~10.5 MB/s - # see: https://tukaani.org/lzma/benchmarks.html - # large fastq ~20GB - # 96 samples x 2 (R1 + R2) = 192 fastqs - # total size (high estimate): 192 fastqs * 20GB/fastq = 3,840 GB = 3,840,000 MB - # total compression time: 3,840,000 MB / (10.5 MB/s) = 365714 s = ~ 4 days - # pigz ~6.5 times faster than gzip - # https://github.com/neurolabusc/pigz-bench - # 3,840,000 MB / (10.5 MB/s * 6.5) = ~ 15.6 hours - # ~~~~~~~~~~~~~~ - - # Add additional singularity bind PATHs - # to mount the local filesystem to the - # containers filesystem, NOTE: these - # PATHs must be an absolute PATHs - outdir = os.path.abspath(outdir).strip() - # Add any default PATHs to bind to - # the container's filesystem, like - # tmp directories, /lscratch - addpaths = [] - temp = os.path.dirname(tmp_dir.rstrip('/')) - if temp == os.sep: - temp = tmp_dir.rstrip('/') - if outdir not in additional_bind_paths.split(','): - addpaths.append(outdir) - if temp not in additional_bind_paths.split(','): - addpaths.append(temp) - bindpaths = ','.join(addpaths) - - # Set ENV variable 'SINGULARITY_CACHEDIR' - # to output directory - my_env = {}; my_env.update(os.environ) - cache = os.path.join(outdir, ".singularity") - my_env['SINGULARITY_CACHEDIR'] = cache - if alt_cache: - # Override the pipeline's default - # cache location - my_env['SINGULARITY_CACHEDIR'] = alt_cache - cache = alt_cache - - if additional_bind_paths: - # Add Bind PATHs for outdir and tmp dir - if bindpaths: - bindpaths = ",{}".format(bindpaths) - bindpaths = "{}{}".format(additional_bind_paths,bindpaths) - - if not exists(os.path.join(outdir, 'logfiles')): - # Create directory for logfiles - os.makedirs(os.path.join(outdir, 'logfiles')) - - # Create .singularity directory for - # installations of snakemake without - # setuid which creates a sandbox in - # the SINGULARITY_CACHEDIR - if not exists(cache): - # Create directory for sandbox - # and image layers - os.makedirs(cache, mode=0o755) - - snakefile = os.path.abspath(os.path.join(__file__, '..', 'workflow', 'coa', 'Snakefile')) - slurm_dir = os.path.abspath(os.path.join(outdir, 'slurm')) - if not os.path.exists(slurm_dir): - os.mkdir(slurm_dir, mode=0o755) - - CLUSTER_OPTS = "sbatch --gres {cluster.gres}" + \ - " --cpus-per-task {cluster.threads}" + \ - " -p {cluster.partition}" + \ - " -t {cluster.time}" + \ - " --mem {cluster.mem}" + \ - " --job-name={params.rname}" + \ - " -e $SLURM_DIR/slurm-%j_{params.rname}.out" + \ - " -o $SLURM_DIR/slurm-%j_{params.rname}.out" - - sbatch_params = [ - "#SBATCH --cpus-per-task=28", - "#SBATCH --mem=64g", - "#SBATCH --time=10-00:00:00", - "#SBATCH -p norm", - "#SBATCH --parsable", - "#SBATCH -J \"metagenome_coa\"", - "#SBATCH --mail-type=BEGIN,END,FAIL", - "#SBATCH --output \"" + outdir + "/logfiles/snakemake.log\"", - "#SBATCH --error \"" + outdir + "/logfiles/snakemake.log\"", - ] - - jobscript = [ - "#!/usr/bin/env bash", - "module load snakemake singularity", - "snakemake \\", - "--latency-wait 120 \\", - "-s " + snakefile + " \\", - "-d \"{outdir}\" \\", - "--use-singularity \\", - "--singularity-args \"'-B " + bindpaths + "'\" \\", - "--configfile=\"" + outdir + "/config.json\" \\", - "--printshellcmds \\", - "--cluster-config \"" + outdir + "/resources/cluster.json\" \\", - "--cluster \"" + CLUSTER_OPTS + "\" \\", - "--keep-going \\", - "--restart-times 3 \\", - "-j 500 \\", - "--rerun-incomplete --stats \"" + outdir + "/logfiles/runtime_statistics.json\" \\", - "--keep-remote \\", - "--local-cores 28 2>&1 | tee -a \"" + outdir + "/logfiles/master.log\"", - ] - - exec_sh = 'bash' - if mode == 'slurm': - exec_sh = 'sbatch' - jobscript = [jobscript[0], *sbatch_params, *jobscript[1:]] - - coa_jobscript = os.path.join(slurm_dir, 'jobscript.sh') - with open(coa_jobscript, 'w') as fo: - fo.write("\n".join(jobscript)) - - coajob = subprocess.Popen([ - exec_sh, str(coa_jobscript) - ], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) - - coajob.wait() - return coajob.returncode - - try: __job_name__ = 'metamorph_' + os.getlogin() + ':master' except OSError: @@ -760,7 +647,7 @@ def runner( threads=2, jobname=__job_name__, submission_script='run.sh', - tmp_dir = '/lscratch/$SLURM_JOBID/' + tmp_dir = '/lscratch/$SLURM_JOB_ID/' ): """Runs the pipeline via selected executor: local or slurm. If 'local' is selected, the pipeline is executed locally on a compute node/instance. @@ -817,7 +704,7 @@ def runner( # Add Bind PATHs for outdir and tmp dir if bindpaths: bindpaths = ",{}".format(bindpaths) - bindpaths = "{}{}".format(additional_bind_paths,bindpaths) + bindpaths = "{}{}".format(additional_bind_paths, bindpaths) if not exists(os.path.join(outdir, 'logfiles')): # Create directory for logfiles @@ -840,9 +727,12 @@ def runner( # replacing Popen subprocess with a direct # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html masterjob = subprocess.Popen([ - 'snakemake', '-pr', '--rerun-incomplete', + 'snakemake', '-pr', + '--rerun-incomplete', + '--rerun-triggers input', + '--verbose', '--use-singularity', - '--singularity-args', "'-B {}'".format(bindpaths), + '--singularity-args', "\\-c \\-B '{}'".format(bindpaths), '--cores', str(threads), '--configfile=config.json' ], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) diff --git a/src/run.sh b/src/run.sh index 1c3bb06..790c8bc 100755 --- a/src/run.sh +++ b/src/run.sh @@ -209,12 +209,11 @@ function submit(){ if [[ ${6#\'} != /lscratch* ]]; then CLUSTER_OPTS="sbatch --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out" fi - # Create sbacth script to build index cat << EOF > kickoff.sh #!/usr/bin/env bash -#SBATCH --cpus-per-task=16 -#SBATCH --mem=96g -#SBATCH --time=5-00:00:00 +#SBATCH --cpus-per-task=16 +#SBATCH --mem=32g +#SBATCH --time=10-00:00:00 #SBATCH --parsable #SBATCH -J "$2" #SBATCH --mail-type=BEGIN,END,FAIL @@ -222,15 +221,29 @@ function submit(){ #SBATCH --error "$3/logfiles/snakemake.log" set -euo pipefail # Main process of pipeline -snakemake --latency-wait 120 -s "$3/workflow/Snakefile" -d "$3" \\ - --use-singularity --singularity-args "'-B $4'" \\ - --use-envmodules --configfile="$3/config.json" \\ - --printshellcmds --cluster-config "$3/config/cluster.json" \\ - --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \\ - --rerun-incomplete --stats "$3/logfiles/runtime_statistics.json" \\ - --keep-remote --local-cores 14 2>&1 +snakemake \\ + -p \\ + --latency-wait 120 \\ + -s "$3/workflow/Snakefile" \\ + -d "$3" \\ + --use-singularity \\ + --singularity-args "\\-c \\-B '$4'" \\ + --use-envmodules \\ + --verbose \\ + --configfile "$3/config.json" \\ + --printshellcmds \\ + --cluster-config $3/config/cluster.json \\ + --cluster "${CLUSTER_OPTS}" \\ + --keep-going \\ + --rerun-incomplete \\ + --jobs 500 \\ + --keep-remote \\ + --stats "$3/logfiles/runtime_statistics.json" \\ + --restart-times 0 \\ + --keep-incomplete \\ + --local-cores "14" 2>&1 # Create summary report -snakemake -d "$3" --report "Snakemake_Report.html" +snakemake -s "$3/workflow/Snakefile" -d "$3" --configfile="$3/config.json" --report "Snakemake_Report.html" EOF chmod +x kickoff.sh job_id=$(sbatch kickoff.sh | tee -a "$3"/logfiles/master.log) diff --git a/workflow/Snakefile b/workflow/Snakefile index 617cf7d..e568367 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -2,12 +2,25 @@ import json import os, sys from os.path import join from os import listdir -from scripts.common import allocated, provided, references, str_bool +from scripts.common import allocated, provided, references, str_bool, list_bool -workpath = config["project"]["workpath"] # Pipeline"s output directory -tmpdir = config["options"]["tmp_dir"] # Temporary directory -samples = config["samples"] # List containing basename of samples +# Global workflow variables +configfile: "config.json" # Generated from user input and config/*.json + +datapath = config["project"]["datapath"] +rna_datapath = config["project"].get("rna_datapath", []) +workpath = config["project"]["workpath"] +tmpdir = config["options"]["tmp_dir"] +coassemble = config['coassembly'] is True +rna_included = list_bool(config.get("rna", 'false')) +rna_coasm = str_bool(config["options"].get("rnacoa", 'False')) +rna_sample_stems = config.get("rna", []) +samples = config["samples"] if not coassemble else ['concatenated'] +top_readqc_dir = join(workpath, config['project']['id'], "metawrap_read_qc") +top_trim_dir = join(workpath, config['project']['id'], "trimmed_reads") +top_readqc_dir_rna = join(workpath, config['project']['id'], "metawrap_read_qc_RNA") +top_trim_dir_rna = join(workpath, config['project']['id'], "trimmed_reads_RNA") # Read in resource information, containing information about threads, mem, walltimes, etc. @@ -15,38 +28,46 @@ samples = config["samples"] # List containing basename of samples with open(join(workpath, "config", "cluster.json")) as fh: cluster = json.load(fh) +if coassemble: + start_r1 = expand(join(workpath, "dna", "{name}_R1.fastq.gz"), name=['concatenated']) + start_r2 = expand(join(workpath, "dna", "{name}_R2.fastq.gz"), name=['concatenated']) +else: + start_r1 = expand(join(workpath, "dna", "{name}_R1.fastq.gz"), name=samples) + start_r2 = expand(join(workpath, "dna", "{name}_R2.fastq.gz"), name=samples) -# Global workflow variables -configfile: "config.json" # Generated from user input and config/*.json +if rna_included: + if rna_coasm: + start_r1_rna = expand(join(workpath, "rna", "{name}_R1.fastq.gz"), name=['concatenated']) + start_r2_rna = expand(join(workpath, "rna", "{name}_R2.fastq.gz"), name=['concatenated']) + else: + start_r1_rna = expand(join(workpath, "rna", "{rname}_R1.fastq.gz"), rname=rna_sample_stems) + start_r2_rna = expand(join(workpath, "rna", "{rname}_R2.fastq.gz"), rname=rna_sample_stems) +else: + start_r1_rna, start_r2_rna = [], [] # Final ouput files of the pipeline rule all: input: + ################# + # DNA outputs # + ################# + # dna read decompress + # ~~~~~~~~~~~~~~~ + start_r1, + start_r2, # read qc and filtering # ~~~~~~~~~~~~~~~ - expand( - join( - workpath, config['project']['id'], "metawrap_qc", "{name}", "{name}.R1_bmtagger_report.html" - ), - name=samples, - ), - expand( - join( - workpath, config['project']['id'], "metawrap_qc", "{name}", "{name}.R2_bmtagger_report.html" - ), - name=samples, - ), + expand(join(top_readqc_dir, "{name}", "{name}_R1_pretrim_report.html"), name=samples), + expand(join(top_readqc_dir, "{name}", "{name}_R2_pretrim_report.html"), name=samples), + expand(join(top_readqc_dir, "{name}", "{name}_R1_postrim_report.html"), name=samples), + expand(join(top_readqc_dir, "{name}", "{name}_R2_postrim_report.html"), name=samples), + expand(join(top_trim_dir, "{name}", "{name}_R1_trimmed.fastq.gz"), name=samples), + expand(join(top_trim_dir, "{name}", "{name}_R2_trimmed.fastq.gz"), name=samples), # genome assembly # ~~~~~~~~~~~~~~~ - expand( - join(workpath, config['project']['id'], "metawrap_assembly", "{name}", "final_assembly.fasta"), - name=samples, - ), - expand( - join(workpath, config['project']['id'], "metawrap_assembly", "{name}", "assembly_report.html"), - name=samples, - ), + expand(join(workpath, config['project']['id'], "metawrap_assembly", "{name}", "final_assembly.fasta"), name=samples), + expand(join(workpath, config['project']['id'], "metawrap_assembly", "{name}", "assembly_report.html"), name=samples), # taxonomic classification # ~~~~~~~~~~~~~~~ expand(join(workpath, config['project']['id'], "metawrap_kmer", "{name}", "final_assembly.krona"), name=samples), @@ -65,8 +86,24 @@ rule all: # bin refinement # ~~~~~~~~~~~~~~~ expand(join(workpath, config['project']['id'], "metawrap_bin_refine", "{name}", "dereplicated_bins"), name=samples), - + ################# + # RNA outputs # + ################# + # rna read decompress + # ~~~~~~~~~~~~~~~ + start_r1_rna, + start_r1_rna, + # read qc and filtering + # ~~~~~~~~~~~~~~~ + expand(join(top_readqc_dir_rna, "{rname}", "{rname}_R1_pretrim_report.html"), rname=rna_sample_stems), + expand(join(top_readqc_dir_rna, "{rname}", "{rname}_R2_pretrim_report.html"), rname=rna_sample_stems), + expand(join(top_readqc_dir_rna, "{rname}", "{rname}_R1_postrim_report.html"), rname=rna_sample_stems), + expand(join(top_readqc_dir_rna, "{rname}", "{rname}_R2_postrim_report.html"), rname=rna_sample_stems), + expand(join(top_trim_dir_rna, "{rname}", "{rname}_R1_trimmed.fastq.gz"), rname=rna_sample_stems), + expand(join(top_trim_dir_rna, "{rname}", "{rname}_R2_trimmed.fastq.gz"), rname=rna_sample_stems), # Import rules -include: join("rules", "metawrap.smk") \ No newline at end of file +include: join("rules", "DNA.smk") +include: join("rules", "RNA.smk") +include: join("rules", "hooks.smk") \ No newline at end of file diff --git a/workflow/coa/Snakefile b/workflow/coa/Snakefile deleted file mode 100644 index f125d53..0000000 --- a/workflow/coa/Snakefile +++ /dev/null @@ -1,40 +0,0 @@ -# ~~~~~~~~~~ -# Metagenome read concatenation -# ~~~~~~~~~~ -ruleorder: - cat_reads < decompress_reads - - -rule all: - input: - expand(join(workpath, "{name}.R{rnum}.fastq.gz", name=samples)), - - - -rule decompress_reads: - input: - all_reads = join(workpath, "{name}.R{rnum}.fastq.gz") - output: - all_uncompressed_read = join(workpath, "{name}.R{rnum}.fastq.gz") - shell: - "gunzip {input.all_reads}" - - -rule cat_reads_and_compress: - input: - all_r1_reads = expand(join(workpath, "{name}.R1.fastq")) - all_r2_reads = expand(join(workpath, "{name}.R2.fastq")) - output: - big_read_r1 = join(workpath, "coa.R1.fastq.gz") - big_read_r2 = join(workpath, "coa.R1.fastq.gz") - params: - uncompressed_r1 = lambda wc, thisout: Path(thisout.big_read_r1).name[:-3] - uncompressed_r2 = lambda wc, thisout: Path(thisout.big_read_r2).name[:-3] - threads: 28 - shell: - """ - cat {input.all_r1_reads} > {params.uncompressed_r1} - cat {input.all_r2_reads} > {params.uncompressed_r2} - pigz -9 {params.uncompressed_r1} - pigz -9 {params.uncompressed_r2} - """ \ No newline at end of file diff --git a/workflow/rules/DNA.smk b/workflow/rules/DNA.smk new file mode 100644 index 0000000..f485e25 --- /dev/null +++ b/workflow/rules/DNA.smk @@ -0,0 +1,459 @@ +# ~~~~~~~~~~ +# Metawrap metagenome assembly and analysis rules +# ~~~~~~~~~~ +from os.path import join, basename +from itertools import chain +from uuid import uuid4 + + +# ~~~~~~~~~~ +# Constants and paths +# ~~~~~~~~~~ +# resource defaults +default_threads = cluster["__default__"]['threads'] +default_memory = cluster["__default__"]['mem'] +# directories +workpath = config["project"]["workpath"] +datapath = config["project"]["datapath"] +samples = config["samples"] if not coassemble else ['concatenated'] +top_log_dir = join(workpath, "logfiles") +top_readqc_dir = join(workpath, config['project']['id'], "metawrap_read_qc") +top_trim_dir = join(workpath, config['project']['id'], "trimmed_reads") +top_assembly_dir = join(workpath, config['project']['id'], "metawrap_assembly") +top_tax_dir = join(workpath, config['project']['id'], "metawrap_kmer") +top_binning_dir = join(workpath, config['project']['id'], "metawrap_binning") +top_refine_dir = join(workpath, config['project']['id'], "metawrap_bin_refine") +# workflow flags +metawrap_container = config["containers"]["metawrap"] +pairedness = list(range(1, config['project']['nends']+1)) +mem2int = lambda x: int(str(x).lower().replace('gb', '').replace('g', '')) + +""" + Step-wise pipeline outline: + 0. concat or no concat + 1. read qc + 2. assembly + 3. binning + 4. bin refinement + 5. depreplicate bins + 6. annotate bins + 7. index depreplicated genomes + 8. align DNA to assembly +""" + +rule concat_reads: + input: + all_r1_reads = expand(join(workpath, "dna", "{sid}_R1.fastq.gz"), sid=config['samples'] if config['coassembly'] else []), + all_r2_reads = expand(join(workpath, "dna", "{sid}_R2.fastq.gz"), sid=config['samples'] if config['coassembly'] else []), + output: + big_compressed_read_r1 = join(workpath, "dna", "concatenated_R1.fastq.gz"), + big_compressed_read_r2 = join(workpath, "dna", "concatenated_R2.fastq.gz"), + big_read1_hash = join(workpath, "dna", "concatenated_R1.md5"), + big_read2_hash = join(workpath, "dna", "concatenated_R2.md5"), + params: + big_read_r1 = join(workpath, "dna", "concatenated_R1.fastq"), + big_read_r2 = join(workpath, "dna", "concatenated_R2.fastq"), + rname = "concat_reads", + input_dir = join(workpath, "dna"), + threads: int(cluster["concat_reads"].get('threads', default_threads)), + shell: + """ + shopt -s extglob + # concat r1 + for fastq in {params.input_dir}/*R1*f?(ast)q*; do + ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat $fastq >> {params.big_read_r1} + else + cat $fastq >> {params.big_read_r1} + fi; + done + + # concat r2 + for fastq in {params.input_dir}/*R2*f?(ast)q*; do + ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat $fastq > {params.big_read_r2} + else + cat $fastq > {params.big_read_r2} + fi; + done + shopt -u extglob + + pigz -9 -p 28 -c {params.big_read_r1} > {output.big_compressed_read_r1} + pigz -9 -p 28 -c {params.big_read_r2} > {output.big_compressed_read_r2} + md5sum {output.big_compressed_read_r1} > {output.big_read1_hash} + md5sum {output.big_compressed_read_r2} > {output.big_read2_hash} + rm {params.big_read_r1} {params.big_read_r2} + """ + + +rule metawrap_read_qc: + """ + The metaWRAP::Read_qc module is meant to pre-process raw Illumina sequencing reads in preparation for assembly and alignment. + The raw reads are trimmed based on adapted content and PHRED scored with the default setting of Trim-galore, ensuring that only high-quality + sequences are left. Then reads are then aligned to the host genome (e.g. human) with bmtagger, and any host reads are removed from the + metagenomic data to remove host contamination. Read pairs where only one read was aligned to the host genome are also removed. + Finally, FASTQC is used to generate quality reports of the raw and final read sets in order to assess read quality improvement. + The users have control over which of the above features they wish to use. + + Quality-control step accomplishes three tasks: + - Trims adapter sequences from input read pairs (.fastq.gz) + - Quality filters from input read pairs + - Removes human contamination from input read pairs + + @Container requirements + - [MetaWrap](https://github.com/bxlab/metaWRAP) orchestrates execution of + these tools in a psuedo-pipeline: + - [FastQC](https://github.com/s-andrews/FastQC) + - [bmtagger](ftp://ftp.ncbi.nlm.nih.gov/pub/agarwala/bmtagger/) + - [Trim-Galore](https://github.com/FelixKrueger/TrimGalore) + - [Cutadapt](https://github.com/marcelm/cutadapt) + + @Environment specifications + - minimum 16gb memory + - will load bmtagger index in memory (8gb) + + @Input: + - Raw fastq reads (R1 & R2 per-sample) from the instrument + + @Outputs: + - Trimmed, quality filtered reads (R1 & R2) + - FastQC html report and zip file on trimmed data + """ + input: + R1 = join(workpath, "dna", "{name}_R1.fastq.gz"), + R2 = join(workpath, "dna", "{name}_R2.fastq.gz"), + output: + R1_pretrim_report = join(top_readqc_dir, "{name}", "{name}_R1_pretrim_report.html"), + R2_pretrim_report = join(top_readqc_dir, "{name}", "{name}_R2_pretrim_report.html"), + R1_postrim_report = join(top_readqc_dir, "{name}", "{name}_R1_postrim_report.html"), + R2_postrim_report = join(top_readqc_dir, "{name}", "{name}_R2_postrim_report.html"), + R1_trimmed = join(top_trim_dir, "{name}", "{name}_R1_trimmed.fastq"), + R2_trimmed = join(top_trim_dir, "{name}", "{name}_R2_trimmed.fastq"), + R1_trimmed_gz = join(top_trim_dir, "{name}", "{name}_R1_trimmed.fastq.gz"), + R2_trimmed_gz = join(top_trim_dir, "{name}", "{name}_R2_trimmed.fastq.gz"), + params: + rname = "metawrap_read_qc", + sid = "{name}", + this_qc_dir = join(top_readqc_dir, "{name}"), + trim_out = join(top_trim_dir, "{name}"), + tmp_safe_dir = join(config['options']['tmp_dir'], 'read_qc'), + tmpr1 = lambda _, output, input: join(config['options']['tmp_dir'], 'read_qc', str(basename(str(input.R1))).replace('_R1.', '_1.').replace('.gz', '')), + tmpr2 = lambda _, output, input: join(config['options']['tmp_dir'], 'read_qc', str(basename(str(input.R2))).replace('_R2.', '_2.').replace('.gz', '')), + containerized: metawrap_container, + threads: int(cluster["metawrap_genome_assembly"].get('threads', default_threads)), + shell: + """ + # safe temp directory + if [ ! -d "{params.tmp_safe_dir}" ]; then mkdir -p "{params.tmp_safe_dir}"; fi + tmp=$(mktemp -d -p "{params.tmp_safe_dir}") + trap 'rm -rf "{params.tmp_safe_dir}"' EXIT + + # uncompress to lscratch + rone="{input.R1}" + ext=$(echo "${{rone: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat {input.R1} > {params.tmpr1} + else + ln -s {input.R1} {params.tmpr1} + fi; + rtwo="{input.R2}" + ext=$(echo "${{rtwo: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat {input.R2} > {params.tmpr2} + else + ln -s {input.R2} {params.tmpr2} + fi; + + # read quality control, host removal + # TODO: add support for mouse reads (mm10 genome prefix, "-x mm10") + mw read_qc -1 {params.tmpr1} -2 {params.tmpr2} -t {threads} -o {params.this_qc_dir} + + # collate fastq outputs to facilitate workflow, compress + ln -s {params.this_qc_dir}/final_pure_reads_1.fastq {params.trim_out}/{params.sid}_R1_trimmed.fastq + ln -s {params.this_qc_dir}/final_pure_reads_2.fastq {params.trim_out}/{params.sid}_R2_trimmed.fastq + pigz -9 -p {threads} -c {params.this_qc_dir}/final_pure_reads_1.fastq > {params.trim_out}/{params.sid}_R1_trimmed.fastq.gz + pigz -9 -p {threads} -c {params.this_qc_dir}/final_pure_reads_2.fastq > {params.trim_out}/{params.sid}_R2_trimmed.fastq.gz + + # collate outputs to facilitate + mkdir -p {params.this_qc_dir}/dna + ln -s {params.this_qc_dir}/post-QC_report/final_pure_reads_1_fastqc.html {params.this_qc_dir}/{params.sid}_R1_postrim_report.html + ln -s {params.this_qc_dir}/post-QC_report/final_pure_reads_2_fastqc.html {params.this_qc_dir}/{params.sid}_R2_postrim_report.html + ln -s {params.this_qc_dir}/pre-QC_report/{params.sid}_1_fastqc.html {params.this_qc_dir}/{params.sid}_R1_pretrim_report.html + ln -s {params.this_qc_dir}/pre-QC_report/{params.sid}_2_fastqc.html {params.this_qc_dir}/{params.sid}_R2_pretrim_report.html + """ + + +rule metawrap_genome_assembly: + """ + The metaWRAP::Assembly module allows the user to assemble a set of metagenomic reads with either + metaSPAdes or MegaHit (default). While metaSPAdes results in a superior assembly in most samples, + MegaHit is much more memory efficient, faster, and scales well with large datasets. + In addition to simplifying parameter selection for the user, this module also sorts and formats + the MegaHit assembly in a way that makes it easier to inspect. The contigs are sorted by length + and their naming is changed to resemble that of SPAdes, including the contig ID, length, and coverage. + Finally, short scaffolds are discarded (<1000bp), and an assembly report is generated with QUAST. + + @Input: + Clean trimmed fastq reads (R1 & R2 per sample) + + @Output: + Megahit assembled contigs and reports + Metaspades assembled contigs and reports + Ensemble assembled contigs and reports + """ + input: + R1 = expand(join(top_trim_dir, "{name}", "{name}_R1_trimmed.fastq"), name=samples), + R2 = expand(join(top_trim_dir, "{name}", "{name}_R2_trimmed.fastq"), name=samples), + output: + # megahit outputs + megahit_assembly = expand(join(top_assembly_dir, "{name}", "megahit", "final.contigs.fa"), name=samples), + megahit_longcontigs = expand(join(top_assembly_dir, "{name}", "megahit", "long.contigs.fa"), name=samples), + megahit_log = expand(join(top_assembly_dir, "{name}", "megahit", "log"), name=samples), + # metaspades outsputs + metaspades_assembly = expand(join(top_assembly_dir, "{name}", "metaspades", "contigs.fasta"), name=samples), + metaspades_graph = expand(join(top_assembly_dir, "{name}", "metaspades", "assembly_graph.fastg"), name=samples), + metaspades_longscaffolds = expand(join(top_assembly_dir, "{name}", "metaspades", "long_scaffolds.fasta"), name=samples), + metaspades_scaffolds = expand(join(top_assembly_dir, "{name}", "metaspades", "scaffolds.fasta"), name=samples), + metaspades_cor_readsr1 = expand(join(top_assembly_dir, "{name}", "metaspades", "corrected", "{name}_1.00.0_0.cor.fastq.gz"), name=samples), + metaspades_cor_readsr2 = expand(join(top_assembly_dir, "{name}", "metaspades", "corrected", "{name}_2.00.0_0.cor.fastq.gz"), name=samples), + # ensemble outputs + final_assembly = expand(join(top_assembly_dir, "{name}", "final_assembly.fasta"), name=samples), + final_assembly_report = expand(join(top_assembly_dir, "{name}", "assembly_report.html"), name=samples), + assembly_R1 = expand(join(top_assembly_dir, "{name}", "{name}_1.fastq"), name=samples), + assembly_R2 = expand(join(top_assembly_dir, "{name}", "{name}_2.fastq"), name=samples), + singularity: metawrap_container, + params: + rname = "metawrap_genome_assembly", + memlimit = mem2int(cluster["metawrap_genome_assembly"].get('mem', default_memory)), + mh_dir = expand(join(top_assembly_dir, "{name}", "megahit"), name=samples), + assembly_dir = expand(join(top_assembly_dir, "{name}"), name=samples), + contig_min_len = "1000", + threads: int(cluster["metawrap_genome_assembly"].get('threads', default_threads)), + shell: + """ + # remove empty directories by snakemake, to prevent metawrap error + rm -rf {params.mh_dir:q} + # link to the file names metawrap expects + ln -s {input.R1} {output.assembly_R1} + ln -s {input.R2} {output.assembly_R2} + # run genome assembler + mw assembly \ + --megahit \ + --metaspades \ + -m {params.memlimit} \ + -t {threads} \ + -l {params.contig_min_len} \ + -1 {output.assembly_R1} \ + -2 {output.assembly_R2} \ + -o {params.assembly_dir} + """ + + +rule metawrap_tax_classification: + """ + The metaWRAP::Taxonomic Classification module takes in any number of fastq or fasta files, classifies the contained sequences + with KRAKEN, and reports the taxonomy distribution in a kronagram using KronaTools. If the sequences passed to the module + belong to an assembly and follow the contig naming convention of the Assembly module, the taxonomy of each contig is weighted based on + its length and coverage [weight=coverage*length]. The classifications of the sequences are then summarized in a format that + KronaTools' ktImportText function recognizes, and a final kronagram in html format is made with all the samples. + + @Input: + - clean & trimmed reads (R1 & R2) + - ensemble genome assembly + + @Output: + - kraken2 kmer classification reports and tabular data outputs + - krona tabular outputs + - krona plot (interactive circular pie charts) of classified taxonomies + + """ + input: + reads = expand(join(top_trim_dir, "{name}", "{name}_R{pair}_trimmed.fastq"), name=samples, pair=['1', '2']), + final_assembly = expand(join(top_assembly_dir, "{name}", "final_assembly.fasta"), name=samples), + output: + krak2_asm = expand(join(top_tax_dir, "{name}", "final_assembly.krak2"), name=samples), + kraken2_asm = expand(join(top_tax_dir, "{name}", "final_assembly.kraken2"), name=samples), + krona_asm = expand(join(top_tax_dir, "{name}", "final_assembly.krona"), name=samples), + kronagram = expand(join(top_tax_dir, "{name}", "kronagram.html"), name=samples), + params: + tax_dir = expand(join(top_tax_dir, "{name}"), name=samples), + rname = "metawrap_tax_classification", + tax_subsample = str(int(1e6)), + singularity: metawrap_container, + threads: int(cluster["metawrap_tax_classification"].get("threads", default_threads)), + shell: + """ + mkdir -p """+top_tax_dir+""" + mw kraken2 \ + -t {threads} \ + -s {params.tax_subsample} \ + -o {params.tax_dir} \ + {input.final_assembly} \ + {input.reads} + """ + + +rule metawrap_binning: + """ + The metaWRAP::Binning module is meant to be a convenient wrapper around three metagenomic binning software: MaxBin2, metaBAT2, and CONCOCT. + + First the metagenomic assembly is indexed with bwa-index, and then paired end reads from any number of samples are aligned to it. + The alignments are sorted and compressed with samtools, and library insert size statistics are also gathered at the same time + (insert size average and standard deviation). + + metaBAT2's `jgi_summarize_bam_contig_depths` function is used to generate contig adundance table, and it is then converted + into the correct format for each of the three binners to take as input. After MaxBin2, metaBAT2, and CONCOCT finish binning + the contigs with default settings (the user can specify which software he wants to bin with), the final bins folders are + created with formatted bin fasta files for easy inspection. + + Optionally, the user can chose to immediately run CheckM on the bins to determine the success of the binning. + + CheckM's `lineage_wf` function is used to predict essential genes and estimate the completion and + contamination of each bin, and a custom script is used to generate an easy to view report on each bin set. + + Orchestrates execution of this ensemble of metagenomic binning software: + - MaxBin2 + - metaBAT2 + - CONCOCT + + Stepwise algorithm for metawrap genome binning: + 1. Alignment + a. metagenomic assembly is indexed with bwa-index + b. paired end reads from any number of samples are aligned + 2. Collate, sort, stage alignment outputs + a. Alignments are sorted and compressed with samtools, and library insert size statistics are also + gathered at the same time (insert size average and standard deviation). + b. metaBAT2's `jgi_summarize_bam_contig_depths` function is used to generate contig adundance table + i. It is converted into the correct format for each of the three binners to take as input. + 3. Ensemble binning + a. MaxBin2, metaBAT2, CONCOCT are run with default settings + b. Final bins directories are created with formatted bin fasta files for easy inspection. + 4. CheckM is run on the bin output from step 3 to determine the success of the binning. + a. CheckM's `lineage_wf` function is used to predict essential genes and estimate the completion and contamination of each bin. + b. Outputs are formatted and collected for better viewing. + + @Input: + Clean trimmed fastq.gz reads (R1 & R2 per sample) + + @Output: + Megahit assembled draft-genomes (bins) and reports + Metaspades assembled draft-genomes (bins) and reports + Ensemble assembled draft-genomes (bins) and reports + + """ + input: + R1 = join(top_trim_dir, "{name}", "{name}_R1_trimmed.fastq"), + R2 = join(top_trim_dir, "{name}", "{name}_R2_trimmed.fastq"), + assembly = join(top_assembly_dir, "{name}", "final_assembly.fasta"), + output: + maxbin_bins = directory(join(top_binning_dir, "{name}", "maxbin2_bins")), + metabat2_bins = directory(join(top_binning_dir, "{name}", "metabat2_bins")), + metawrap_bins = directory(join(top_binning_dir, "{name}", "metawrap_50_5_bins")), + maxbin_contigs = join(top_binning_dir, "{name}", "maxbin2_bins.contigs"), + maxbin_stats = join(top_binning_dir, "{name}", "maxbin2_bins.stats"), + metabat2_contigs = join(top_binning_dir, "{name}", "metabat2_bins.contigs"), + metabat2_stats = join(top_binning_dir, "{name}", "metabat2_bins.stats"), + metawrap_contigs = join(top_binning_dir, "{name}", "metawrap_50_5_bins.contigs"), + metawrap_stats = join(top_binning_dir, "{name}", "metawrap_50_5_bins.stats"), + bin_figure = join(top_binning_dir, "{name}", "figures", "binning_results.png"), + params: + rname = "metawrap_binning", + bin_parent_dir = top_binning_dir, + bin_dir = join(top_binning_dir, "{name}"), + bin_mem = mem2int(cluster['metawrap_binning'].get("mem", default_memory)), + mw_trim_linker_R1 = join(top_trim_dir, "{name}", "{name}_1.fastq"), + mw_trim_linker_R2 = join(top_trim_dir, "{name}", "{name}_2.fastq"), + min_perc_complete = "50", + max_perc_contam = "5", + singularity: metawrap_container, + threads: int(cluster["metawrap_binning"].get("threads", default_threads)), + shell: + """ + # set checkm data + export CHECKM_DATA_PATH="/data2/CHECKM_DB" + + # make base dir if not exists + mkdir -p {params.bin_parent_dir} + if [ -d "{params.bin_dir}" ]; then rm -rf {params.bin_dir}; fi + + # setup links for metawrap input + [[ -f "{params.mw_trim_linker_R1}" ]] || ln -s {input.R1} {params.mw_trim_linker_R1} + [[ -f "{params.mw_trim_linker_R2}" ]] || ln -s {input.R2} {params.mw_trim_linker_R2} + + # metawrap binning + mw binning \ + --metabat2 --maxbin2 --concoct \ + -m {params.bin_mem} \ + -t {threads} \ + -a {input.assembly} \ + -o {params.bin_dir} \ + {params.mw_trim_linker_R1} {params.mw_trim_linker_R2} + + # metawrap bin refinement + mw bin_refinement \ + -o {params.bin_dir} \ + -t {threads} \ + -A {params.bin_dir}/metabat2_bins \ + -B {params.bin_dir}/maxbin2_bins \ + -c {params.min_perc_complete} \ + -x {params.max_perc_contam} + """ + + +rule derep_bins: + """ + dRep is a step which further refines draft-quality genomes (bins) by using a + fast, inaccurate estimation of genome distance, and a slow, accurate measure + of average nucleotide identity. + + @Input: + maxbin2 ssembly bins, contigs, stat summaries + metabat2 ssembly bins, contigs, stat summaries + metawrap ssembly bins, contigs, stat summaries + + @Output: + directory of consensus ensemble bins (deterministic output) + + """ + input: + maxbin_contigs = join(top_binning_dir, "{name}", "maxbin2_bins.contigs"), + maxbin_stats = join(top_binning_dir, "{name}", "maxbin2_bins.stats"), + metabat2_contigs = join(top_binning_dir, "{name}", "metabat2_bins.contigs"), + metabat2_stats = join(top_binning_dir, "{name}", "metabat2_bins.stats"), + metawrap_contigs = join(top_binning_dir, "{name}", "metawrap_50_5_bins.contigs"), + metawrap_stats = join(top_binning_dir, "{name}", "metawrap_50_5_bins.stats"), + output: + dereplicated_bins = directory(join(top_refine_dir, "{name}", "dereplicated_bins")), + singularity: metawrap_container, + threads: int(cluster["derep_bins"].get("threads", default_threads)), + params: + rname = "derep_bins", + sid = "{name}", + maxbin_bins = join(top_binning_dir, "{name}", "maxbin2_bins"), + metabat2_bins = join(top_binning_dir, "{name}", "metabat2_bins"), + metawrap_bins = join(top_binning_dir, "{name}", "metawrap_50_5_bins"), + # -l LENGTH: Minimum genome length (default: 50000) + minimum_genome_length = "1000", + # -pa[--P_ani] P_ANI: ANI threshold to form primary (MASH) clusters (default: 0.9) + ani_primary_threshold = "0.9", + # -sa[--S_ani] S_ANI: ANI threshold to form secondary clusters (default: 0.95) + ani_secondary_threshold = "0.95", + # -nc[--cov_thresh] COV_THRESH: Minmum level of overlap between genomes when doing secondary comparisons (default: 0.1) + min_overlap = "0.1", + # -cm[--coverage_method] {total,larger} {total,larger}: Method to calculate coverage of an alignment + coverage_method = 'larger', + shell: + """ + mkdir -p {output.dereplicated_bins} + dRep dereplicate \ + -g $(ls {params.metawrap_bins}/* | tr '\\n' ' ') \ + -p {threads} \ + -l {params.minimum_genome_length} \ + -pa {params.ani_primary_threshold} \ + -sa {params.ani_secondary_threshold} \ + -nc {params.min_overlap} \ + -cm {params.coverage_method} \ + {output.dereplicated_bins} + """ diff --git a/workflow/rules/RNA.smk b/workflow/rules/RNA.smk new file mode 100644 index 0000000..67a7475 --- /dev/null +++ b/workflow/rules/RNA.smk @@ -0,0 +1,127 @@ +# ~~~~~~~~~~ +# Metawrap metagenome assembly and analysis rules +# ~~~~~~~~~~ +from os.path import join +from itertools import chain +from scripts.common import str_bool, list_bool + +# ~~~~~~~~~~ +# Constants and paths +# ~~~~~~~~~~ +workpath = config["project"]["workpath"] +rna_datapath = config["project"].get("rna_datapath", "/dev/null") +rna_included = list_bool(config.get("rna", 'false')) +# rna_coasm = str_bool(config["options"].get("rnacoa", 'False')) +rna_coasm = False +rna_sample_stems = config.get("rna", []) +rna_compressed = True # if accepting uncompressed fastq input +top_readqc_dir_rna = join(workpath, config['project']['id'], "metawrap_read_qc_RNA") +top_trim_dir_rna = join(workpath, config['project']['id'], "trimmed_reads_RNA") +metawrap_container = config["containers"]["metawrap"] +pairedness = list(range(1, config['project']['nends']+1)) + + +rule concat_rna_reads: + input: + all_r1_reads = expand(join(workpath, "rna", "{rname}_R1.fastq.gz"), rname=rna_sample_stems if rna_coasm else []), + all_r2_reads = expand(join(workpath, "rna", "{rname}_R2.fastq.gz"), rname=rna_sample_stems if rna_coasm else []), + output: + big_compressed_read_r1 = join(workpath, "rna", "concatenated_R1.fastq.gz"), + big_compressed_read_r2 = join(workpath, "rna", "concatenated_R2.fastq.gz"), + big_read1_hash = join(workpath, "rna", "concatenated_R1.md5"), + big_read2_hash = join(workpath, "rna", "concatenated_R2.md5"), + params: + rname = "concat_rna_reads", + big_read_r1 = join(workpath, "dna", "concatenated_R1.fastq"), + big_read_r2 = join(workpath, "dna", "concatenated_R2.fastq"), + input_dir = workpath, + threads: int(cluster["concat_rna_reads"].get('threads', default_threads)), + shell: + """ + # concat r1 + for fastq in {params.input_dir}/*R1*fastq; do + ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat $fastq >> {params.big_read_r1} + else + cat $fastq >> {params.big_read_r1} + fi; + done + + # concat r2 + for fastq in {params.input_dir}/*R2*fastq; do + ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat $fastq > {params.big_read_r2} + else + cat $fastq >> {params.big_read_r2} + fi; + done + pigz -9 -p 28 -c {output.big_read_r1} > {output.big_compressed_read_r1} + pigz -9 -p 28 -c {output.big_read_r2} > {output.big_compressed_read_r2} + md5sum {output.big_compressed_read_r1} > {output.big_read1_hash} + md5sum {output.big_compressed_read_r2} > {output.big_read2_hash} + """ + + +rule rna_read_qc: + input: + R1 = join(workpath, "rna", "{rname}_R1.fastq.gz"), + R2 = join(workpath, "rna", "{rname}_R2.fastq.gz"), + output: + R1_pretrim_report = join(top_readqc_dir_rna, "{rname}", "{rname}_R1_pretrim_report.html"), + R2_pretrim_report = join(top_readqc_dir_rna, "{rname}", "{rname}_R2_pretrim_report.html"), + R1_postrim_report = join(top_readqc_dir_rna, "{rname}", "{rname}_R1_postrim_report.html"), + R2_postrim_report = join(top_readqc_dir_rna, "{rname}", "{rname}_R2_postrim_report.html"), + R1_trimmed = join(top_trim_dir_rna, "{rname}", "{rname}_R1_trimmed.fastq"), + R2_trimmed = join(top_trim_dir_rna, "{rname}", "{rname}_R2_trimmed.fastq"), + R1_trimmed_gz = join(top_trim_dir_rna, "{rname}", "{rname}_R1_trimmed.fastq.gz"), + R2_trimmed_gz = join(top_trim_dir_rna, "{rname}", "{rname}_R2_trimmed.fastq.gz"), + params: + rname = "rna_read_qc", + sid = "{rname}", + this_qc_dir = join(top_readqc_dir_rna, "{rname}"), + trim_out = join(top_trim_dir_rna, "{rname}"), + tmp_safe_dir = join(config['options']['tmp_dir'], 'read_qc'), + tmpr1 = lambda _, output, input: join(config['options']['tmp_dir'], 'read_qc', str(basename(str(input.R1))).replace('_R1.', '_1.').replace('.gz', '')), + tmpr2 = lambda _, output, input: join(config['options']['tmp_dir'], 'read_qc', str(basename(str(input.R2))).replace('_R2.', '_2.').replace('.gz', '')), + containerized: metawrap_container, + shell: + """ + # safe temp directory + if [ ! -d "{params.tmp_safe_dir}" ]; then mkdir -p "{params.tmp_safe_dir}"; fi + tmp=$(mktemp -d -p "{params.tmp_safe_dir}") + trap 'rm -rf "{params.tmp_safe_dir}"' EXIT + + # uncompress to lscratch + rone="{input.R1}" + ext=$(echo "${{rone: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat {input.R1} > {params.tmpr1} + else + ln -s {input.R1} {params.tmpr1} + fi; + rtwo="{input.R2}" + ext=$(echo "${{rtwo: -2}}" | tr '[:upper:]' '[:lower:]') + if [[ "$ext" == "gz" ]]; then + zcat {input.R2} > {params.tmpr2} + else + ln -s {input.R2} {params.tmpr2} + fi; + + # read quality control, host removal + # TODO: add support for mouse reads (mm10 genome prefix, "-x mm10") + mw read_qc -1 {params.tmpr1} -2 {params.tmpr2} -t {threads} -o {params.this_qc_dir} + + # collate fastq outputs to facilitate workflow, compress + ln -s {params.this_qc_dir}/final_pure_reads_1.fastq {params.trim_out}/{params.sid}_R1_trimmed.fastq + ln -s {params.this_qc_dir}/final_pure_reads_2.fastq {params.trim_out}/{params.sid}_R2_trimmed.fastq + pigz -9 -p {threads} -c {params.this_qc_dir}/final_pure_reads_1.fastq > {params.trim_out}/{params.sid}_R1_trimmed.fastq.gz + pigz -9 -p {threads} -c {params.this_qc_dir}/final_pure_reads_2.fastq > {params.trim_out}/{params.sid}_R2_trimmed.fastq.gz + + # collate outputs to facilitate + ln -s {params.this_qc_dir}/post-QC_report/final_pure_reads_1_fastqc.html {params.this_qc_dir}/{params.sid}_R1_postrim_report.html + ln -s {params.this_qc_dir}/post-QC_report/final_pure_reads_2_fastqc.html {params.this_qc_dir}/{params.sid}_R2_postrim_report.html + ln -s {params.this_qc_dir}/pre-QC_report/{params.sid}_1_fastqc.html {params.this_qc_dir}/{params.sid}_R1_pretrim_report.html + ln -s {params.this_qc_dir}/pre-QC_report/{params.sid}_2_fastqc.html {params.this_qc_dir}/{params.sid}_R2_pretrim_report.html + """ diff --git a/workflow/rules/hooks.smk b/workflow/rules/hooks.smk index 3a68818..647624b 100644 --- a/workflow/rules/hooks.smk +++ b/workflow/rules/hooks.smk @@ -56,10 +56,13 @@ if config["options"]["mode"] == "slurm": # is run and no child jobs are submitted touch failed_jobs_${{timestamp}}.tsv }} - touch COMPLETED + failed_wc=$(wc -l failed_jobs_${{timestamp}}.tsv); + if [ "$failed_wc" -le "1" ]; then + rm failed_jobs_${{timestamp}}.tsv; + fi + touch COMPLETED """ ) - onerror: shell( """ diff --git a/workflow/rules/metawrap.smk b/workflow/rules/metawrap.smk deleted file mode 100644 index e10d5d0..0000000 --- a/workflow/rules/metawrap.smk +++ /dev/null @@ -1,286 +0,0 @@ -# ~~~~~~~~~~ -# Metawrap metagenome assembly and analysis rules -# ~~~~~~~~~~ -from os.path import join -from itertools import chain - - -# ~~~~~~~~~~ -# Constants and paths -# ~~~~~~~~~~ -workpath = config["project"]["workpath"] -datapath = config["project"]["datapath"] -default_threads = cluster["__default__"]['threads'] -default_memory = cluster["__default__"]['mem'] -top_log_dir = join(workpath, config['project']['id'], "logs") -top_readqc_dir = join(workpath, config['project']['id'], "metawrap_qc") -top_assembly_dir = join(workpath, config['project']['id'], "metawrap_assembly") -top_tax_dir = join(workpath, config['project']['id'], "metawrap_kmer") -top_binning_dir = join(workpath, config['project']['id'], "metawrap_binning") -top_refine_dir = join(workpath, config['project']['id'], "metawrap_bin_refine") -metawrap_container = config["containers"]["metawrap"] - - -""" - 1. read qc - 2. assembly - 3. binning - 4. bin refinement - 5. depreplicate bins - 6. annotate bins - 7. index depreplicated genomes - 8. align DNA to assembly -""" - - -rule metawrap_read_qc: - """ - Metawrap read quality control rule for producing high quality reads for assembly. - - Quality-control step accomplishes three tasks: - - Trims adapter sequences from input read pairs (.fastq.gz) - - Quality filters from input read pairs - - Removes human contamination from input read pairs - @Container requirements - - [MetaWrap](https://github.com/bxlab/metaWRAP) orchestrates execution of - these tools in a psuedo-pipeline: - - [FastQC](https://github.com/s-andrews/FastQC) - - [bmtagger](ftp://ftp.ncbi.nlm.nih.gov/pub/agarwala/bmtagger/) - - [Trim-Galore](https://github.com/FelixKrueger/TrimGalore) - - [Cutadapt](https://github.com/marcelm/cutadapt) - - @Environment specifications - - minimum 16gb memory - - will load bmtagger index in memory (8gb) - @Input: - - Raw fastq.gz reads (R1 & R2 per-sample) from the instrument - - @Outputs: - - Trimmed, quality filtered reads (R1 & R2) - - FastQC html report and zip file on trimmed data - """ - input: - R1 = join(datapath, "{name}_R1.fastq.gz"), - R2 = join(datapath, "{name}_R2.fastq.gz"), - output: - R1_bmtagger_report = join(top_readqc_dir, "{name}", "{name}.R1_bmtagger_report.html"), - R2_bmtagger_report = join(top_readqc_dir, "{name}", "{name}.R2_bmtagger_report.html"), - R1_fastqc_report = join(top_readqc_dir, "{name}", "{name}.R1_fastqc_report.html"), - R2_fastqc_report = join(top_readqc_dir, "{name}", "{name}.R2_fastqc_report.html"), - R1_qc_reads = join(workpath, "{name}", "{name}.R1_readqc.fastq"), - R2_qc_reads = join(workpath, "{name}", "{name}.R2_readqc.fastq"), - params: - rname = "metawrap_read_qc", - this_qc_dir = join(top_readqc_dir, "{name}"), - R1_mw_named = join(top_readqc_dir, "{name}", "{name}_1.fastq"), - R2_mw_named = join(top_readqc_dir, "{name}", "{name}_2.fastq"), - containerized: metawrap_container, - log: join(top_log_dir, "read_qc", "{name}") - threads: int(cluster["metawrap_genome_assembly"].get('threads', default_threads)), - shell: - """ - gunzip -c {input.R1} > {params.R1_mw_named} - gunzip -c {input.R2} > {params.R2_mw_named} - metawrap read_qc -1 {params.R1_mw_named} -2 {params.R2_mw_named} -t {threads} -o {params.this_qc_dir} - mv {params.this_qc_dir}/final_pure_reads_1.fastq {output.R1_qc_reads} - rm -f {params.R1_mw_named} - cp {params.this_qc_dir}/post-QC_report/final_pure_reads_1_fastqc.html {output.R1_bmtagger_report} - cp {params.this_qc_dir}/pre-QC_report/{params.R1_mw_named}_fastqc.html {output.R1_fastqc_report} - mv {params.this_qc_dir}/final_pure_reads_2.fastq {output.R2_qc_reads} - rm -f {params.R2_mw_named} - cp {params.this_qc_dir}/post-QC_report/final_pure_reads_2_fastqc.html {output.R2_bmtagger_report} - cp {params.this_qc_dir}/pre-QC_report/{params.R2_mw_named}_fastqc.html {output.R2_fastqc_report} - """ - - -rule metawrap_genome_assembly: - """ - TODO - @Input: - Clean trimmed fastq.gz reads (R1 & R2 per sample) - @Output: - Megahit assembled contigs and reports - Metaspades assembled contigs and reports - Ensemble assembled contigs and reports - """ - input: - R1 = expand(join(workpath, "{name}", "{name}.R1_readqc.fastq"), name=samples), - R2 = expand(join(workpath, "{name}", "{name}.R2_readqc.fastq"), name=samples), - output: - # megahit outputs - megahit_assembly = expand(join(top_assembly_dir, "{name}", "megahit", "final.contigs.fa"), name=samples), - megahit_longcontigs = expand(join(top_assembly_dir, "{name}", "megahit", "long.contigs.fa"), name=samples), - megahit_log = expand(join(top_assembly_dir, "{name}", "megahit", "log"), name=samples), - # metaspades outsputs - metaspades_assembly = expand(join(top_assembly_dir, "{name}", "metaspades", "contigs.fasta"), name=samples), - metaspades_graph = expand(join(top_assembly_dir, "{name}", "metaspades", "assembly_graph.fastg"), name=samples), - metaspades_longscaffolds = expand(join(top_assembly_dir, "{name}", "metaspades", "long_scaffolds.fasta"), name=samples), - metaspades_scaffolds = expand(join(top_assembly_dir, "{name}", "metaspades", "scaffolds.fasta"), name=samples), - metaspades_cor_readsr1 = expand(join(top_assembly_dir, "{name}", "metaspades", "{name}_1.fastq.00.0_0.cor.fastq.gz"), name=samples), - metaspades_cor_readsr2 = expand(join(top_assembly_dir, "{name}", "metaspades", "corrected", "{name}_2.fastq.00.0_0.cor.fastq.gz"), name=samples), - # ensemble outputs - final_assembly = expand(join(top_assembly_dir, "{name}", "final_assembly.fasta"), name=samples), - final_assembly_report = expand(join(top_assembly_dir, "{name}", "assembly_report.html"), name=samples), - assembly_R1 = expand(join(top_assembly_dir, "{name}", "{name}_1.fastq"), name=samples), - assembly_R2 = expand(join(top_assembly_dir, "{name}", "{name}_2.fastq"), name=samples), - assembly_dir = expand(join(top_assembly_dir, "{name}"), name=samples), - singularity: metawrap_container, - params: - rname = "metawrap_genome_assembly", - memlimit = cluster["metawrap_genome_assembly"].get('mem', default_memory), - contig_min_len = "1000", - threads: int(cluster["metawrap_genome_assembly"].get('threads', default_threads)), - shell: - """ - # link to the file names metawrap expects - ln -s {input.R1} {output.assembly_R1} - ln -s {input.R2} {output.assembly_R2} - # run genome assembler - metawrap assembly \ - --megahit \ - --metaspades \ - -m {params.memlimit} \ - -t {threads} \ - -l {params.contig_min_len} \ - -1 {output.assembly_R1} \ - -2 {output.assembly_R2} \ - -o {output.assembly_dir} - """ - - -rule metawrap_tax_classification: - """ - TODO: docstring - """ - input: - reads = expand(join(workpath, "{name}", "{name}.R{pair}_readqc.fastq"), name=samples, pair=['1', '2']), - final_assembly = expand(join(top_assembly_dir, "{name}", "final_assembly.fasta"), name=samples), - output: - krak2_asm = expand(join(top_tax_dir, "{name}", "final_assembly.krak2"), name=samples), - kraken2_asm = expand(join(top_tax_dir, "{name}", "final_assembly.kraken2"), name=samples), - krona_asm = expand(join(top_tax_dir, "{name}", "final_assembly.krona"), name=samples), - kronagram = expand(join(top_tax_dir, "{name}", "kronagram.html"), name=samples), - tax_dir = expand(join(top_tax_dir, "{name}"), name=samples), - params: - rname = "metawrap_tax_classification", - tax_subsample = str(int(1e6)), - singularity: metawrap_container, - threads: cluster["metawrap_tax_classification"].get("threads", default_threads), - shell: - """ - metawrap kraken2 \ - -t {threads} \ - -s {params.tax_subsample} \ - -o {output.tax_dir} \ - {input.final_assembly} \ - {input.reads} - """ - - -rule metawrap_setup_binning: - input: - R1_from_qc = join(workpath, "{name}", "{name}.R1_readqc.fastq"), - R2_from_qc = join(workpath, "{name}", "{name}.R2_readqc.fastq"), - output: - R1_bin_name = join(workpath, "{name}", "{name}_1.fastq"), - R2_bin_name = join(workpath, "{name}", "{name}_2.fastq"), - params: - rname = "metawrap_setup_binning", - shell: - """ - ln -s {input.R1_from_qc} {output.R1_bin_name} - ln -s {input.R2_from_qc} {output.R2_bin_name} - """ - - -rule metawrap_binning: - input: - r1_read = join(workpath, "{name}", "{name}_1.fastq"), - r2_read = join(workpath, "{name}", "{name}_2.fastq"), - assembly = join(top_assembly_dir, "{name}", "final_assembly.fasta"), - output: - maxbin_bins = directory(join(top_binning_dir, "{name}", "maxbin2_bins")), - metabat2_bins = directory(join(top_binning_dir, "{name}", "metabat2_bins")), - metawrap_bins = directory(join(top_binning_dir, "{name}", "metawrap_50_5_bins")), - maxbin_contigs = join(top_binning_dir, "{name}", "maxbin2_bins.contigs"), - maxbin_stats = join(top_binning_dir, "{name}", "maxbin2_bins.stats"), - metabat2_contigs = join(top_binning_dir, "{name}", "metabat2_bins.contigs"), - metabat2_stats = join(top_binning_dir, "{name}", "metabat2_bins.stats"), - metawrap_contigs = join(top_binning_dir, "{name}", "metawrap_50_5_bins.contigs"), - metawrap_stats = join(top_binning_dir, "{name}", "metawrap_50_5_bins.stats"), - bin_figure = join(top_binning_dir, "{name}", "figures", "binning_results.png"), - params: - rname = "metawrap_binning", - bin_dir = join(top_binning_dir, "{name}"), - refine_dir = join(top_refine_dir, "{name}"), - bin_mem = cluster.get("metawrap_assembly_binning", default_memory), - min_perc_complete = "50", - max_perc_contam = "5", - singularity: metawrap_container, - threads: cluster["metawrap_tax_classification"].get("threads", default_threads), - shell: - """ - # metawrap binning - metawrap binning \ - --metabat2 --maxbin2 --concoct \ - -m {params.bin_mem} \ - -t {threads} \ - -a {input.assembly} \ - -o {params.bin_dir} \ - {input.r1_read} {input.r2_read} - # metawrap bin refinement - metawrap bin_refinement \ - -o {params.refine_dir} \ - -t {threads} \ - -A {params.bin_dir}/metabat2_bins \ - -B {params.bin_dir}/maxbin2_bins \ - -c {params.min_perc_complete} \ - -x {params.max_perc_contam} - """ - - -rule derep_bins: - input: - maxbin_contigs = join(top_binning_dir, "{name}", "maxbin2_bins.contigs"), - maxbin_stats = join(top_binning_dir, "{name}", "maxbin2_bins.stats"), - metabat2_contigs = join(top_binning_dir, "{name}", "metabat2_bins.contigs"), - metabat2_stats = join(top_binning_dir, "{name}", "metabat2_bins.stats"), - metawrap_contigs = join(top_binning_dir, "{name}", "metawrap_50_5_bins.contigs"), - metawrap_stats = join(top_binning_dir, "{name}", "metawrap_50_5_bins.stats"), - output: - dereplicated_bins = directory(join(top_refine_dir, "{name}", "dereplicated_bins")), - search_rep_bc = join(top_binning_dir, "{name}", "maxbin2_bins"), - singularity: metawrap_container, - threads: 32 - params: - rname = "derep_bins", - sid = "{name}", - maxbin_bins = join(top_binning_dir, "{name}", "maxbin2_bins"), - metabat2_bins = join(top_binning_dir, "{name}", "metabat2_bins"), - metawrap_bins = join(top_binning_dir, "{name}", "metawrap_50_5_bins"), - # -l LENGTH: Minimum genome length (default: 50000) - minimum_genome_length = "1000", - # -pa[--P_ani] P_ANI: ANI threshold to form primary (MASH) clusters (default: 0.9) - ani_primary_threshold = "0.9", - # -sa[--S_ani] S_ANI: ANI threshold to form secondary clusters (default: 0.95) - ani_secondary_threshold = "0.95", - # -nc[--cov_thresh] COV_THRESH: Minmum level of overlap between genomes when doing secondary comparisons (default: 0.1) - min_overlap = "0.1", - # -cm[--coverage_method] {total,larger} {total,larger}: Method to calculate coverage of an alignment - coverage_method = 'larger', - shell: - """ - sed -i 's/^bin./{name}_bin./g' {input.maxbin_stats} && \ - sed -i 's/^bin./{name}_bin./g' {input.metabat2_stats} && \ - sed -i 's/^bin./{name}_bin./g' {input.metawrap_stats} && \ - touch {output.search_rep_bc} - dRep dereplicate \ - -p {threads} \ - -l {params.minimum_genome_length} \ - -pa {params.ani_primary_threshold} \ - -sa {params.ani_secondary_threshold} \ - -nc {params.min_overlap} \ - -cm {params.coverage_method} \ - -g {params.metawrap_bins}/* \ - {output.dereplicated_bins} - """ diff --git a/workflow/scripts/common.py b/workflow/scripts/common.py index 92f9d23..5e660a2 100644 --- a/workflow/scripts/common.py +++ b/workflow/scripts/common.py @@ -65,6 +65,8 @@ def allocated(resource, rule, lookup, default="__default__"): def str_bool(s): """ + Deprecated from stdlib in 3.10, see: https://peps.python.org/pep-0632/ + Converts a string to boolean. It is dangerous to try to typecast a string into a boolean value using the built-in `bool()` function. This function avoids any issues that can @@ -74,6 +76,8 @@ def str_bool(s): boolean('False') returns False boolean('asdas') raises TypeError """ + if not isinstance(s, str): + s = str(s) val = s.lower() if val in ['true', '1', 'y', 'yes']: return True @@ -82,4 +86,13 @@ def str_bool(s): else: # Provided value could not be # type casted into a boolean - raise TypeError('Fatal: cannot type cast {} into a boolean'.format(val)) \ No newline at end of file + raise TypeError('Fatal: cannot type cast {} into a boolean'.format(val)) + + +def list_bool(l): + # some lists are strings of "None" if no files input + # type checking instead of string manip + if isinstance(l, list): + if len(l) > 0: + return True + return False \ No newline at end of file