Skip to content

Commit

Permalink
feat: add flag to set snakemake rerun triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
rroutsong committed Aug 9, 2024
1 parent 17a982b commit dd7d4e8
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 71 deletions.
28 changes: 15 additions & 13 deletions metamorph
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,9 @@ import argparse, sys, os, subprocess, json, textwrap

# Local imports
from src import version
from src.run import init, setup, bind, dryrun, runner, valid_input
from src.utils import (
Colors,
err,
exists,
fatal,
check_cache,
require,
permissions
)
from src.run import init, setup, bind, dryrun, runner
from src.utils import Colors, err, exists, fatal, check_cache, require, \
permissions, valid_trigger, valid_input


# Pipeline Metadata
Expand Down Expand Up @@ -190,6 +183,7 @@ def run(sub_args):
if 'databases' in config:
bindpaths.extend([mount['from']+':'+mount['to']+':'+mount['mode'] for mount in config['databases']])

triggers = sub_args.triggers if sub_args.triggers else None
mjob = runner(mode = sub_args.mode,
outdir = sub_args.output,
alt_cache = sub_args.singularity_cache,
Expand All @@ -199,6 +193,7 @@ def run(sub_args):
logger = logfh,
additional_bind_paths = ",".join(bindpaths),
tmp_dir = sub_args.tmp_dir,
triggers = triggers
)

# Step 6. Wait for subprocess to complete,
Expand Down Expand Up @@ -466,9 +461,6 @@ def parsed_arguments(name, description):
action='help',
help=argparse.SUPPRESS
)

# Analysis options
# ... add here

# Orchestration Options
# Execution Method, run locally
Expand All @@ -492,6 +484,16 @@ def parsed_arguments(name, description):
help = argparse.SUPPRESS
)

# Snakemake rerun triggers
subparser_run.add_argument(
'-t', '--triggers',
type = valid_trigger,
required = False,
default = None,
nargs="*",
help = argparse.SUPPRESS
)

# Dry-run
# Do not execute the workflow,
# prints what steps remain
Expand Down
65 changes: 11 additions & 54 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import print_function
from shutil import copytree
from pathlib import Path
from csv import DictReader, Sniffer

import os, re, json, sys, subprocess, argparse


Expand Down Expand Up @@ -664,53 +664,6 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna
return dryrun_output


def valid_input(sheet):
"""
Valid sample sheets should contain two columns: "DNA" and "RNA"
_________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
"""
# check file permissions
sheet = os.path.abspath(sheet)
if not os.path.exists(sheet):
raise argparse.ArgumentTypeError(f'Sample sheet path {sheet} does not exist!')
if not os.access(sheet, os.R_OK):
raise argparse.ArgumentTypeError(f"Path `{sheet}` exists, but cannot read path due to permissions!")

# check format to make sure it's correct
if sheet.endswith('.tsv') or sheet.endswith('.txt'):
delim = '\t'
elif sheet.endswith('.csv'):
delim = '\t'

rdr = DictReader(open(sheet, 'r'), delimiter=delim)

if 'DNA' not in rdr.fieldnames:
raise argparse.ArgumentTypeError("Sample sheet does not contain `DNA` column")
if 'RNA' not in rdr.fieldnames:
print("-- Running in DNA only mode --")
else:
print("-- Running in paired DNA & RNA mode --")

data = [row for row in rdr]
RNA_included = False
for row in data:
row['DNA'] = os.path.abspath(row['DNA'])
if not os.path.exists(row['DNA']):
raise argparse.ArgumentTypeError(f"Sample sheet path `{row['DNA']}` does not exist")
if 'RNA' in row and not row['RNA'] in ('', None, 'None'):
RNA_included = True
row['RNA'] = os.path.abspath(row['RNA'])
if not os.path.exists(row['RNA']):
raise argparse.ArgumentTypeError(f"Sample sheet path `{row['RNA']}` does not exist")

return data, RNA_included


try:
__job_name__ = 'metamorph_' + os.getlogin() + ':master'
except OSError:
Expand All @@ -726,6 +679,7 @@ def runner(
threads=2,
jobname=__job_name__,
submission_script='run.sh',
triggers=None,
tmp_dir = '/lscratch/$SLURM_JOB_ID/'
):
"""Runs the pipeline via selected executor: local or slurm.
Expand Down Expand Up @@ -833,11 +787,14 @@ def runner(
# --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \
# --rerun-incomplete --stats "$3"/logfiles/runtime_statistics.json \
# --keep-remote --local-cores 30 2>&1 | tee -a "$3"/logfiles/master.log
masterjob = subprocess.Popen([
str(submission_script), mode,
'-j', jobname, '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache),
'-t', "'{}'".format(tmp_dir)
], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)
cmd = [
str(submission_script), mode,
'-j', jobname, '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache),
'-t', "'{}'".format(tmp_dir),
]
if triggers:
cmd.extend(['-r', ','.join(triggers)])
masterjob = subprocess.Popen(cmd, cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)

return masterjob
26 changes: 22 additions & 4 deletions src/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ USAGE:
-o OUTDIR \\
-j MASTER_JOB_NAME \\
-b SINGULARITY_BIND_PATHS \\
-t TMP_DIR
-t TMP_DIR \\
-r RERUN_TRIGGERS
SYNOPSIS:
This script creates/submits the pipeline's master job to the
cluster. The master job acts as the pipeline's main controller or
Expand Down Expand Up @@ -61,7 +62,12 @@ Required Arguments:
this location. On Biowulf, it should be
set to '/lscratch/\$SLURM_JOBID/'. On FRCE,
this value should be set to the following:
'/scratch/cluster_scratch/\$USER/'.
'/scratch/cluster_scratch/\$USER/'.
-r, --triggers [Type: Str] Snakemake rerun triggers. See
description of flag '--rerun-triggers', at
https://snakemake.readthedocs.io/en/stable/executing/cli.html#all-options
for more details.
Default: code params software_env input mtime
OPTIONS:
-c, --cache [Type: Path] Path to singularity cache. If not provided,
the path will default to the current working
Expand Down Expand Up @@ -97,6 +103,7 @@ function parser() {
-t | --tmp-dir) provided "$key" "${2:-}"; Arguments["t"]="$2"; shift; shift;;
-o | --outdir) provided "$key" "${2:-}"; Arguments["o"]="$2"; shift; shift;;
-c | --cache) provided "$key" "${2:-}"; Arguments["c"]="$2"; shift; shift;;
-r | --triggers) provided "$key" "${2:-}"; Arguments["r"]="$2"; shift; shift;;
-* | --*) err "Error: Failed to parse unsupported argument: '${key}'."; usage && exit 1;;
*) err "Error: Failed to parse unrecognized argument: '${key}'. Do any of your inputs have spaces?"; usage && exit 1;;
esac
Expand Down Expand Up @@ -159,6 +166,7 @@ function submit(){
# INPUT $4 = Singularity Bind paths
# INPUT $5 = Singularity cache directory
# INPUT $6 = Temporary directory for output files
# INPUT $7 = rerun trigger values

# Check if singularity and snakemake are in $PATH
# If not, try to module load singularity as a last resort
Expand Down Expand Up @@ -191,6 +199,11 @@ function submit(){
# --printshellcmds --keep-going --rerun-incomplete
# --keep-remote --restart-times 3 -j 500 --use-singularity
# --singularity-args -B {}.format({bindpaths}) --local-cores 24
triggers="${7:-'{code,params,software_env,input,mtime}'}"
if [[ ! ${triggers:0:1} == "{" ]] ; then triggers="{$triggers"; fi
if [[ ! ${triggers:0-1} == "}" ]] ; then triggers+='}'; fi
rerun="--rerun-triggers $triggers"

SLURM_DIR="$3/logfiles/slurmfiles"
CLUSTER_OPTS="sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out"
# Check if NOT running on Biowulf
Expand Down Expand Up @@ -228,6 +241,7 @@ snakemake \\
-s "$3/workflow/Snakefile" \\
-d "$3" \\
--use-singularity \\
$rerun \\
--singularity-args "\\-c \\-B '$4'" \\
--use-envmodules \\
--verbose \\
Expand Down Expand Up @@ -279,9 +293,9 @@ function main(){

# Parses remaining user provided command-line arguments
parser "${@:2}" # Remove first item of list

outdir="$(abspath "$(dirname "${Arguments[o]}")")"
Arguments[o]="${Arguments[o]%/}" # clean outdir path (remove trailing '/')

# Setting defaults for non-required arguments
# If singularity cache not provided, default to ${outdir}/.singularity
cache="${Arguments[o]}/.singularity"
Expand All @@ -294,7 +308,11 @@ function main(){

# Run pipeline and submit jobs to cluster using the defined executor
mkdir -p "${Arguments[o]}/logfiles/"
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}")
if [[ ! -v Arguments[r] ]] ; then
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}")
else
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}" "${Arguments[r]}")
fi
echo -e "[$(date)] Pipeline submitted to cluster.\nMaster Job ID: $job_id"
echo "${job_id}" > "${Arguments[o]}/logfiles/mjobid.log"

Expand Down
57 changes: 57 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from shutil import copytree
import os, sys, hashlib
import subprocess, json
from csv import DictReader


def md5sum(filename, first_block_only = False, blocksize = 65536):
Expand Down Expand Up @@ -370,6 +371,62 @@ def hashed(l):
return h


def valid_input(sheet):
from argparse import ArgumentTypeError
"""
Valid sample sheets should contain two columns: "DNA" and "RNA"
_________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
"""
# check file permissions
sheet = os.path.abspath(sheet)
if not os.path.exists(sheet):
raise ArgumentTypeError(f'Sample sheet path {sheet} does not exist!')
if not os.access(sheet, os.R_OK):
raise ArgumentTypeError(f"Path `{sheet}` exists, but cannot read path due to permissions!")

# check format to make sure it's correct
if sheet.endswith('.tsv') or sheet.endswith('.txt'):
delim = '\t'
elif sheet.endswith('.csv'):
delim = '\t'

rdr = DictReader(open(sheet, 'r'), delimiter=delim)

if 'DNA' not in rdr.fieldnames:
raise ArgumentTypeError("Sample sheet does not contain `DNA` column")
if 'RNA' not in rdr.fieldnames:
print("-- Running in DNA only mode --")
else:
print("-- Running in paired DNA & RNA mode --")

data = [row for row in rdr]
RNA_included = False
for row in data:
row['DNA'] = os.path.abspath(row['DNA'])
if not os.path.exists(row['DNA']):
raise ArgumentTypeError(f"Sample sheet path `{row['DNA']}` does not exist")
if 'RNA' in row and not row['RNA'] in ('', None, 'None'):
RNA_included = True
row['RNA'] = os.path.abspath(row['RNA'])
if not os.path.exists(row['RNA']):
raise ArgumentTypeError(f"Sample sheet path `{row['RNA']}` does not exist")

return data, RNA_included


def valid_trigger(trigger_given):
from argparse import ArgumentTypeError
snk_triggers = ('mtime', 'code', 'input', 'params', 'software-env')
if not trigger_given in snk_triggers:
raise ArgumentTypeError('Invalid trigger selected please only use one of: ' + ', '.join(snk_triggers))
return trigger_given


if __name__ == '__main__':
# Calculate MD5 checksum of entire file
print('{} {}'.format(md5sum(sys.argv[0]), sys.argv[0]))
Expand Down

0 comments on commit dd7d4e8

Please sign in to comment.