diff --git a/bcl2fastq/bcl2fastq.py b/bcl2fastq/bcl2fastq.py index 80fd786..2707f50 100755 --- a/bcl2fastq/bcl2fastq.py +++ b/bcl2fastq/bcl2fastq.py @@ -26,10 +26,14 @@ os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "lib")) if LIB_PATH not in sys.path: sys.path.insert(0, LIB_PATH) -from pipelines import get_pipeline_version, get_site +from pipelines import get_pipeline_version +from pipelines import get_site +from pipelines import get_default_queue from pipelines import PipelineHandler -from pipelines import get_machine_run_flowcell_id, is_devel_version -from pipelines import logger as aux_logger, generate_timestamp +from pipelines import get_machine_run_flowcell_id +from pipelines import is_devel_version +from pipelines import logger as aux_logger +from pipelines import generate_timestamp from pipelines import get_cluster_cfgfile from generate_bcl2fastq_cfg import MUXINFO_CFG, SAMPLESHEET_CSV, USEBASES_CFG, MuxUnit @@ -50,10 +54,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "bcl2fastq" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} OUTDIR_BASE = { 'GIS': { @@ -204,11 +204,10 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-l', '--lanes', type=int, nargs="*", @@ -385,7 +384,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - outdir, user_data, site=site, + outdir, user_data, logger_cmd=mongo_update_cmd, master_q=args.master_q, slave_q=args.slave_q, diff --git a/custom/SG10K/SG10K.py b/custom/SG10K/SG10K.py index 7c43a49..b3231e5 100755 --- a/custom/SG10K/SG10K.py +++ b/custom/SG10K/SG10K.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import get_cluster_cfgfile @@ -50,11 +50,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "SG10K" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -77,11 +72,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -167,7 +162,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/lib/pipelines.py b/lib/pipelines.py index 5440215..aa9d73e 100644 --- a/lib/pipelines.py +++ b/lib/pipelines.py @@ -56,6 +56,26 @@ 'local': "true",# dummy command } + +DEFAULT_SLAVE_Q = { + 'GIS': { + 'enduser': None, 'production': None, + }, + 'NSCC': { + 'enduser': None, 'production': 'production', + } +} + +DEFAULT_MASTER_Q = { + 'GIS': { + 'enduser': None, 'production': None, + }, + 'NSCC': { + 'enduser': None, 'production': 'production', + } +} + + # from address, i.e. users should reply to to this # instead of rpd@gis to which we send email RPD_MAIL = "rpd@gis.a-star.edu.sg" @@ -81,7 +101,7 @@ class PipelineHandler(object): # output PIPELINE_CFGFILE = "conf.yaml" - + RC_DIR = "rc" RC_FILES = { @@ -127,15 +147,15 @@ def __init__(self, self.log_dir_rel = self.LOG_DIR_REL self.masterlog = self.MASTERLOG self.submissionlog = self.SUBMISSIONLOG - + if params_cfgfile: assert os.path.exists(params_cfgfile) self.params_cfgfile = params_cfgfile - + if modules_cfgfile: assert os.path.exists(modules_cfgfile) self.modules_cfgfile = modules_cfgfile - + if refs_cfgfile: assert os.path.exists(refs_cfgfile) self.refs_cfgfile = refs_cfgfile @@ -143,10 +163,10 @@ def __init__(self, if cluster_cfgfile: assert os.path.exists(cluster_cfgfile) self.cluster_cfgfile = cluster_cfgfile - + self.pipeline_cfgfile_out = os.path.join( self.outdir, self.PIPELINE_CFGFILE) - + # RCs self.dk_init_file = os.path.join( self.outdir, self.RC_FILES['DK_INIT']) @@ -174,7 +194,7 @@ def __init__(self, if self.cluster_cfgfile: self.cluster_cfgfile_out = os.path.join(outdir, "cluster.yaml") # else: local - + # run template self.run_template = os.path.join( pipeline_rootdir, "lib", "run.template.{}.sh".format(self.site)) @@ -266,7 +286,7 @@ def write_run_template(self): with open(self.run_out, 'w') as fh: fh.write(templ.format(**d)) - + def read_cfgfiles(self): """parse default config and replace all RPD env vars """ @@ -292,7 +312,7 @@ def read_cfgfiles(self): else: assert cfgkey not in merged_cfg merged_cfg[cfgkey] = cfg - + # determine num_chroms needed by some pipelines # FIXME ugly because sometimes not needed if merged_cfg.get('references'): @@ -517,6 +537,28 @@ def email_for_user(): return toaddr + +def is_production_user(): + return getuser() == "userrig" + + +def get_default_queue(master_or_slave): + """FIXME:add-doc + """ + + if is_production_user(): + user = 'production' + else: + user = 'enduser' + site = get_site() + if master_or_slave == 'master': + return DEFAULT_MASTER_Q[site][user] + elif master_or_slave == 'slave': + return DEFAULT_MASTER_Q[site][user] + else: + raise ValueError(master_or_slave) + + def send_status_mail(pipeline_name, success, analysis_id, outdir, extra_text=None, pass_exception=True): """ diff --git a/mapping/BWA-MEM/BWA-MEM.py b/mapping/BWA-MEM/BWA-MEM.py index 1c83382..ec6e4d9 100755 --- a/mapping/BWA-MEM/BWA-MEM.py +++ b/mapping/BWA-MEM/BWA-MEM.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import ref_is_indexed from pipelines import get_cluster_cfgfile @@ -52,11 +52,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "BWA-MEM" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -79,11 +74,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -177,7 +172,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/metagenomics/essential-genes/essential-genes.py b/metagenomics/essential-genes/essential-genes.py index 415072d..3467ac3 100755 --- a/metagenomics/essential-genes/essential-genes.py +++ b/metagenomics/essential-genes/essential-genes.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import ref_is_indexed from pipelines import get_cluster_cfgfile @@ -50,11 +50,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "essential-genes" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - MARK_DUPS = True # global logger @@ -80,11 +75,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -186,7 +181,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/rnaseq/fluidigm-ht-c1-rnaseq/fluidigm-ht-c1-rnaseq.py b/rnaseq/fluidigm-ht-c1-rnaseq/fluidigm-ht-c1-rnaseq.py index 8b9a0fa..d7ff144 100755 --- a/rnaseq/fluidigm-ht-c1-rnaseq/fluidigm-ht-c1-rnaseq.py +++ b/rnaseq/fluidigm-ht-c1-rnaseq/fluidigm-ht-c1-rnaseq.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import get_cluster_cfgfile @@ -49,11 +49,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "fluidigm-ht-c1-rnaseq" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -76,11 +71,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -166,7 +161,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/rnaseq/star-rsem/star-rsem.py b/rnaseq/star-rsem/star-rsem.py index 55afa66..3c05285 100755 --- a/rnaseq/star-rsem/star-rsem.py +++ b/rnaseq/star-rsem/star-rsem.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import get_cluster_cfgfile @@ -50,11 +50,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "star-rsem" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -77,11 +72,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -181,7 +176,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/somatic/lofreq-somatic/lofreq-somatic.py b/somatic/lofreq-somatic/lofreq-somatic.py index 8b9317d..abaa1c0 100755 --- a/somatic/lofreq-somatic/lofreq-somatic.py +++ b/somatic/lofreq-somatic/lofreq-somatic.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import get_cluster_cfgfile @@ -51,11 +51,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "lofreq-somatic" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -79,11 +74,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -229,7 +224,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/somatic/mutect/mutect.py b/somatic/mutect/mutect.py index 8982c76..f720c48 100755 --- a/somatic/mutect/mutect.py +++ b/somatic/mutect/mutect.py @@ -28,7 +28,7 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import get_cluster_cfgfile @@ -50,11 +50,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "mutect" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -78,11 +73,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -230,7 +225,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/variant-calling/gatk/gatk.py b/variant-calling/gatk/gatk.py index 19a8696..62c4f86 100755 --- a/variant-calling/gatk/gatk.py +++ b/variant-calling/gatk/gatk.py @@ -28,9 +28,9 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site from pipelines import logger as aux_logger from pipelines import get_cluster_cfgfile +from pipelines import get_default_queue __author__ = "Andreas Wilm" __email__ = "wilma@gis.a-star.edu.sg" @@ -48,11 +48,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "gatk" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - MARK_DUPS = True # global logger @@ -77,11 +72,10 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -185,7 +179,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg, diff --git a/variant-calling/lacer-lofreq/lacer-lofreq.py b/variant-calling/lacer-lofreq/lacer-lofreq.py index 1798591..dbe046b 100755 --- a/variant-calling/lacer-lofreq/lacer-lofreq.py +++ b/variant-calling/lacer-lofreq/lacer-lofreq.py @@ -28,13 +28,12 @@ from readunits import get_readunits_from_args from pipelines import get_pipeline_version from pipelines import PipelineHandler -from pipelines import get_site +from pipelines import get_default_queue from pipelines import logger as aux_logger from pipelines import ref_is_indexed from pipelines import get_cluster_cfgfile - __author__ = "Andreas Wilm" __email__ = "wilma@gis.a-star.edu.sg" __copyright__ = "2016 Genome Institute of Singapore" @@ -53,11 +52,6 @@ # same as folder name. also used for cluster job names PIPELINE_NAME = "lacer-lofreq" -DEFAULT_SLAVE_Q = {'GIS': None, - 'NSCC': 'production'} -DEFAULT_MASTER_Q = {'GIS': None, - 'NSCC': 'production'} - # global logger logger = logging.getLogger(__name__) handler = logging.StreamHandler() @@ -81,11 +75,11 @@ def main(): help="Give this analysis run a name (used in email and report)") parser.add_argument('--no-mail', action='store_true', help="Don't send mail on completion") - site = get_site() - default = DEFAULT_SLAVE_Q.get(site, None) + #site = get_site() + default = get_default_queue('slave') parser.add_argument('-w', '--slave-q', default=default, help="Queue to use for slave jobs (default: {})".format(default)) - default = DEFAULT_MASTER_Q.get(site, None) + default = get_default_queue('master') parser.add_argument('-m', '--master-q', default=default, help="Queue to use for master job (default: {})".format(default)) parser.add_argument('-n', '--no-run', action='store_true') @@ -199,7 +193,7 @@ def main(): pipeline_handler = PipelineHandler( PIPELINE_NAME, PIPELINE_BASEDIR, - args.outdir, user_data, site=site, + args.outdir, user_data, master_q=args.master_q, slave_q=args.slave_q, params_cfgfile=args.params_cfg,