Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add a generic slurm cluster submit option #283

Merged
merged 14 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,3 @@ pytest.out
src/cgr_gwas_qc/workflow/scripts/orig/
src/cgr_gwas_qc/workflow/sub_workflows/orig/
src/cgr_gwas_qc/reporting/templates/orig/

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ $ source conda/bin/activate base
```
- Install GwasQcPipeline source code
```
$ source /scratch/myfolder/GwasQcPipeline_v1.4.0/conda/bin/activate GwasQcPipeline
$ source /scratch/myfolder/GwasQcPipeline_v1.4.0/conda/bin/activate GwasQcPipeline
(GwasQcPipeline) $ pip install https://github.com/NCI-CGR/GwasQcPipeline/releases/download/v1.4.0/cgr_gwas_qc-1.4.0-py3-none-any.whl
(GwasQcPipeline) $ cgr --help
(GwasQcPipeline) $ cgr version
Expand Down
12 changes: 11 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@

project = "GwasQcPipeline"
copyright = "2020, Leidos Biomedical Research, Inc."
authors = ["Justin Fear", "Eric Karlins", "Jiahui Wang", "Cameron Palmer", "Bari Ballew", "Bin Zhu"]
authors = [
"Justin Fear",
"Eric Karlins",
"Jiahui Wang",
"Cameron Palmer",
"Bari Ballew",
"Bin Zhu",
"Caryn Willis",
"Kevin Liao",
"Jesse Marks",
]

# The full version, including alpha/beta/rc tags
release = (
Expand Down
2 changes: 1 addition & 1 deletion docs/getting_started/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Full Example
ld_prune_r2: 0.1
maf_for_ibd: 0.2
maf_for_hwe: 0.05
ibd_pi_hat_min: 0.05
ibd_pi_hat_min: 0.12
ibd_pi_hat_max: 1.0
dup_concordance_cutoff: 0.95
intensity_threshold: 6000
Expand Down
2 changes: 1 addition & 1 deletion docs/getting_started/running_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ For external users will need to create your own `snakemake cluster profile`_.

**Biowulf users**.
You may need to adjust ``--time-hr``, ``--local_mem_mb``, and ``--local_tasks``
if you main job is getting killed by the cluster because of resource limits.
if your main job is getting killed by the cluster because of resource limits.

The submission script will create a log file ``./gwas_qc_log.$JOB_ID`` that will have the status of your cluster submission.
Logs for each submitted job can be found in ``./logs/``.
37 changes: 24 additions & 13 deletions src/cgr_gwas_qc/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ def main(
project_name: Optional[str] = typer.Option(
None, help="The project name to use for this QC run."
),
slurm_partition: Optional[str] = typer.Option(
None,
help="Name of the Slurm partition to which jobs will be submitted. "
"This is required when running ``cgr submit --slurm``.",
),
include_unused_settings: bool = typer.Option(
False,
"--include-unused-settings",
Expand Down Expand Up @@ -90,22 +95,26 @@ def main(

Non-CGR users and CGR users on other systems will probably want to run::

$ cgr config -s <path to lims manifest file or sample sheet> --project-name <my project name>
$ cgr config \\
--sample-sheet <path to lims manifest file or sample sheet> \\
--project-name <my project name> \\
[--slurm-partition <partition_name>]

This will create the ``config.yml`` file in the current working directory
with place holders for reference files and user files.
This will generate the ``config.yml`` file in the current working directory,
with placeholders for reference files and user files.
Slurm users can include the ``--slurm-partition`` option to specify
the name of the queue to which your jobs will be submitted.

.. attention::
All uses should always open the ``config.yml`` file and make any
necessary changes. When you are happy then continue to ``cgr
pre-flight``.
Always review and update ``config.yml`` before each pipeline run.
Then run ``cgr pre-flight`` to ensure proper configuration.

.. warning::
The sample sheet must exist and be readable. We will raises an error if
it is not.
The sample sheet must exist and be readable.
An error will be raised if it is not.

"""
cfg = initialize_config(sample_sheet, project_name, bpm_file, genome_build)
cfg = initialize_config(sample_sheet, project_name, bpm_file, genome_build, slurm_partition)

# Update config to use paths on CGEMs/CCAD or add place holders for other systems
update_config_for_cgems(cfg) if cgems | cgems_dev else update_config_for_general(cfg)
Expand Down Expand Up @@ -137,6 +146,7 @@ def initialize_config(
project_name: Optional[str],
bpm_file: Optional[Path],
genome_build: GenomeBuild,
slurm_partition: Optional[str],
) -> Config:
"""Initialize config object.

Expand All @@ -148,6 +158,7 @@ def initialize_config(
project_name=project_name,
sample_sheet=sample_sheet.resolve(),
genome_build=genome_build,
slurm_partition=slurm_partition,
num_snps=0,
reference_files=ReferenceFiles.construct(
illumina_manifest_file=bpm_file,
Expand All @@ -157,10 +168,10 @@ def initialize_config(
user_files=UserFiles(
output_pattern="{prefix}/{file_type}.{ext}",
idat_pattern=Idat(
red="/expample/pattern/wildcards/are/columns/in/sample_sheet_file/{Project}/{Sample_ID}_Red.idat",
green="/expample/pattern/wildcards/are/columns/in/sample_sheet_file/{Project}/{Sample_ID}_Grn.idat",
red="/example/pattern/wildcards/are/columns/in/sample_sheet_file/{Project}/{Sample_ID}_Red.idat",
green="/example/pattern/wildcards/are/columns/in/sample_sheet_file/{Project}/{Sample_ID}_Grn.idat",
),
gtc_pattern="/expample/pattern/wildcards/are/columns/in/sample_sheet_file/{Project}/{Sample_ID}.gtc",
gtc_pattern="/example/pattern/wildcards/are/columns/in/sample_sheet_file/{Project}/{Sample_ID}.gtc",
),
software_params=SoftwareParams(),
workflow_params=WorkflowParams(),
Expand Down Expand Up @@ -206,7 +217,7 @@ def _add_project_name(cfg: Config):

def _sample_sheet_name_to_project_name(stem: str):
"""Convert a CGEMs sample sheet name to the project name."""
if (m := re.match(r"(?P<prefix>.*)_AnalysisManifest_(?P<suffix>.*)", stem)) :
if m := re.match(r"(?P<prefix>.*)_AnalysisManifest_(?P<suffix>.*)", stem):
return "_".join(m.groups())
return f"GwasQcPipeline_{TODAY}"

Expand Down
2 changes: 1 addition & 1 deletion src/cgr_gwas_qc/cli/snakemake.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def main(ctx: typer.Context):
snakemake -s <path to workflow install location> OPTIONS TARGETS

In addition, we also add some snakemake options if you did not specify them.
This is mostly for convenience. Specifically, the we require the use of
This is mostly for convenience. Specifically, we require the use of
``conda``, so we always will add ``--use-conda``. Second, in recent
versions of snakemake they no longer default to ``--cores 1`` and will throw
an error if you did not specify a value. We find this annoying, so we will
Expand Down
61 changes: 49 additions & 12 deletions src/cgr_gwas_qc/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ def main(
biowulf: bool = typer.Option(
False, help="Run the workflow using the cli Biowulf cluster profile."
),
cluster_profile: Optional[Path] = typer.Option(
None,
help="Path to a custom cluster profile. See https://github.com/snakemake-profiles/doc.",
),
ccad2: bool = typer.Option(False, help="Run the workflow using the cli ccad2 cluster profile."),
cluster_profile2: Optional[Path] = typer.Option(
slurm: bool = typer.Option(
False,
help="Run the workflow using the generic slurm cluster profile. "
"This option requires that you specify the ``slurm_partition`` "
"in the ``config.yml`` file.",
),
cluster_profile: Optional[Path] = typer.Option(
None,
help="Path to a custom cluster profile. See https://github.com/snakemake-profiles/doc.",
),
Expand Down Expand Up @@ -83,6 +85,7 @@ def main(
The ``cgr submit`` command will create a submission script and submit it to the cluster.
We will create an optimized submission script for users of CGEMs/CCAD, CCAD2 and Biowulf.
For other systems you will need to provide a snakemake cluster profile to tell snakemake how use your system.
If you are submitting to a Slurm cluster, you can use the generic Slurm cluster profile we provide by including the ``--slurm`` option in your submit command.

Users running on CGEMs/CCAD will typically run::

Expand All @@ -96,18 +99,26 @@ def main(

cgr submit --ccad2

Users running on other systems will typically run::
Users running on other Slurm systems will typically run::

cgr submit --slurm

cgr submit --profile <path to custom cluster profile> --queue <name of the queue to submit main job> --submission-cmd <tool used for submit such as sbatch or qsub>
Users running with a custom cluster profile will typically run:

.. code-block:: none

cgr submit \\
--profile <path to custom cluster profile> \\
--queue <name of the queue to submit main job> \\
--submission-cmd <tool used for submit such as sbatch or qsub>

.. note::
Sometimes it may be useful to edit the submission script before submitting it to the cluster.
In that case you can add the ``--dry-run`` option and edit the generated file in ``.snakemake/GwasQcPipeline_submission.sh``.
You would then submit this script directly to your cluster (i.e., ``qsub .snakemake/GwasQcPipeline_submission.sh``).

"""

check_exclusive_options(cgems, biowulf, ccad2, cluster_profile)
check_exclusive_options(cgems, biowulf, ccad2, slurm, cluster_profile)

payload = {
"python_executable": sys.executable,
Expand All @@ -117,6 +128,7 @@ def main(
"cgems": cgems,
"biowulf": biowulf,
"ccad2": ccad2,
"slurm": slurm,
"time_hr": time_hr,
"local_mem_mb": local_mem_mb,
"local_tasks": local_tasks,
Expand Down Expand Up @@ -153,22 +165,42 @@ def main(
payload["profile"] = get_profile("ccad2")
payload["queue"] = queue or ("defq,defq" if time_hr <= 4 else "defq")
submission_cmd = "sbatch"
elif slurm:
payload["profile"] = get_profile("slurm")
queue = cfg.config.slurm_partition
if queue is None:
raise ValueError(
"""
\033[1;31mMissing required configuration key-value pair for slurm option\033[0m\n
The `--slurm` option requires the `slurm_partition`
key-value pair to be defined in your `config.yml` file.
This pair specifies the partition where your job will
run on the Slurm scheduler. The key is `slurm_partition`
and the value should be the name of the desired partition.
You can execute `sinfo` to see which partitions are available.
Add a line like this in your config.yml:\n
\033[32mslurm_partition: <partition_name>\033[0m
"""
)
payload["queue"] = queue
submission_cmd = "sbatch"
else:
payload["profile"] = check_custom_cluster_profile(cluster_profile, queue, submission_cmd)
payload["queue"] = queue

run_script = create_submission_script(payload)

if not dry_run:
job_id = sp.check_output([submission_cmd, run_script]).decode().strip() # type: ignore
print(f"Submitted {job_id}")


def check_exclusive_options(cgems, biowulf, ccad2, cluster_profile):
if sum([cgems, biowulf, ccad2, (cluster_profile is not None)]) > 1:
def check_exclusive_options(cgems, biowulf, ccad2, slurm, cluster_profile):
if sum([cgems, biowulf, ccad2, slurm, (cluster_profile is not None)]) > 1:
typer.echo(
"\n".join(
wrap(
"Please only provide one of `--cgems`, `--biowulf`, `--ccad2, or `--cluster_profile`. "
"Please only provide one of `--cgems`, `--biowulf`, `--ccad2, `--slurm`, or `--cluster_profile`. "
"Run `cgr submit --help` for more information.",
width=100,
)
Expand Down Expand Up @@ -210,6 +242,11 @@ def get_profile(cluster: str):
if cluster == "ccad2":
return (cgr_profiles / "ccad2").as_posix()

if cluster == "slurm":
return (cgr_profiles / "slurm_generic").as_posix()
# if queue is None:
# raise ValueError("You must provide provide slurm_parition in the config.yml to use with `--slurm`.")


def create_submission_script(payload) -> str:
template = env.get_template("snakemake.sh")
Expand Down
6 changes: 5 additions & 1 deletion src/cgr_gwas_qc/cluster_profiles/biowulf/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ def __str__(self):
" --time={time}"
" --output={log}"
)

if self.time > timedelta(hours=4):
self.queue.discard("quick")

formatted_time = f"{self.time.days}-{self.time.seconds // 3600}" # days-hours for slurm

return cmd.format(
queue=",".join(self.queue),
mem_gb=self.mem_gb,
time=str(self.time),
time=formatted_time,
threads=self.threads,
rulename=self.rulename,
job_id=self.job_id,
Expand Down
Empty file.
12 changes: 12 additions & 0 deletions src/cgr_gwas_qc/cluster_profiles/slurm_generic/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
jobscript: jobscript.sh
cluster: submit.py
cluster-status: status.py
jobs: 1000
keep-going: true
latency-wait: 60
max-jobs-per-second: 10
max-status-checks-per-second: 10
printshellcmds: true
rerun-incomplete: true
restart-times: 3
use-conda: true
10 changes: 10 additions & 0 deletions src/cgr_gwas_qc/cluster_profiles/slurm_generic/jobscript.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
# properties = {properties}
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --parsable
set -euo pipefail

{exec_job}

echo $?
74 changes: 74 additions & 0 deletions src/cgr_gwas_qc/cluster_profiles/slurm_generic/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
import logging
import re
import shlex
import subprocess as sp
import sys
import time
from typing import Optional

logger = logging.getLogger("__name__")
logger.setLevel(40)

MAX_STATUS_ATTEMPTS = 20

STATUS_CODES = {
"BOOT_FAIL": "failed",
"CANCELLED": "failed",
"COMPLETED": "success",
"DEADLINE": "failed",
"FAILED": "failed",
"NODE_FAIL": "failed",
"OUT_OF_MEMORY": "failed",
"PENDING": "running",
"PREEMPTED": "failed",
"RUNNING": "running",
"REQUEUED": "running",
"RESIZING": "running",
"REVOKED": "running",
"SUSPENDED": "failed",
"TIMEOUT": "failed",
}


def main():
job_id = int(sys.argv[1])
for _ in range(MAX_STATUS_ATTEMPTS):
job_status = check_sacct(job_id) or check_scontrol(job_id)
if job_status:
break
time.sleep(5)

print(job_status or "failed")


def check_sacct(job_id: int) -> Optional[str]:
try:
job_info = sp.check_output(shlex.split(f"sacct -P -b -j {job_id} -n"))
except sp.CalledProcessError as err:
logger.error("sacct process error")
logger.error(err)
return None

try:
status = {x.split("|")[0]: x.split("|")[1] for x in job_info.decode().strip().split("\n")}
return STATUS_CODES.get(status[f"{job_id}"], None)
except IndexError:
return None


def check_scontrol(job_id: int) -> Optional[str]:
try:
job_info = sp.check_output(shlex.split(f"scontrol -o show job {job_id}"))
except sp.CalledProcessError as err:
logger.error("scontrol process error")
logger.error(err)
return None

m = re.search(r"JobState=(\w+)", job_info.decode())
status = {job_id: m.group(1)} if m else {}
return STATUS_CODES.get(status[job_id], None)


if __name__ == "__main__":
main()
Loading
Loading