diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 409ce3b7..2f686b17 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -76,11 +76,19 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@v3 - - name: Preprocessing Smoke Test + - name: Preprocessing Smoke Test With QC uses: snakemake/snakemake-github-action@v1.24.0 with: directory: 'example/preprocess' - snakefile: 'pipelines/preprocess.snakefile' + snakefile: 'pipelines/preprocess_with_qc.snakefile' + args: '-j 2 -n --configfile pipelines/config/deeprvat_preprocess_config.yaml' + stagein: 'touch example/preprocess/workdir/reference/GRCh38.primary_assembly.genome.fa' + + - name: Preprocessing Smoke Test No QC + uses: snakemake/snakemake-github-action@v1.24.0 + with: + directory: 'example/preprocess' + snakefile: 'pipelines/preprocess_no_qc.snakefile' args: '-j 2 -n --configfile pipelines/config/deeprvat_preprocess_config.yaml' stagein: 'touch example/preprocess/workdir/reference/GRCh38.primary_assembly.genome.fa' @@ -98,7 +106,48 @@ jobs: args: '-j 2 -n --configfile pipelines/config/deeprvat_annotation_config.yaml' - DeepRVAT-Preprocessing-Pipeline-Tests: + DeepRVAT-Preprocessing-Pipeline-Tests-No-QC: + runs-on: ubuntu-latest + needs: DeepRVAT-Preprocessing-Pipeline-Smoke-Tests + steps: + + - name: Check out repository code + uses: actions/checkout@v3 + - uses: mamba-org/setup-micromamba@v1.4.3 + with: + environment-name: deeprvat-preprocess-gh-action + environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml + cache-environment: true + cache-downloads: true + + - name: Install DeepRVAT + run: pip install -e ${{ github.workspace }} + shell: micromamba-shell {0} + + - name: Cache Fasta file + id: cache-fasta + uses: actions/cache@v3 + with: + path: example/preprocess/workdir/reference + key: ${{ runner.os }}-reference-fasta + + - name: Download and unpack fasta data + if: steps.cache-fasta.outputs.cache-hit != 'true' + run: | + cd ${{ github.workspace }}/example/preprocess && \ + wget https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_44/GRCh38.primary_assembly.genome.fa.gz \ + -O workdir/reference/GRCh38.primary_assembly.genome.fa.gz \ + && gzip -d workdir/reference/GRCh38.primary_assembly.genome.fa.gz + + - name: Run preprocessing pipeline + run: | + python -m snakemake -j 2 --directory ${{ github.workspace }}/example/preprocess \ + --snakefile ${{ github.workspace }}/pipelines/preprocess_no_qc.snakefile \ + --configfile ${{ github.workspace }}/pipelines/config/deeprvat_preprocess_config.yaml --show-failed-logs + shell: micromamba-shell {0} + + + DeepRVAT-Preprocessing-Pipeline-Tests-With-QC: runs-on: ubuntu-latest needs: DeepRVAT-Preprocessing-Pipeline-Smoke-Tests steps: @@ -134,6 +183,6 @@ jobs: - name: Run preprocessing pipeline run: | python -m snakemake -j 2 --directory ${{ github.workspace }}/example/preprocess \ - --snakefile ${{ github.workspace }}/pipelines/preprocess.snakefile \ + --snakefile ${{ github.workspace }}/pipelines/preprocess_with_qc.snakefile \ --configfile ${{ github.workspace }}/pipelines/config/deeprvat_preprocess_config.yaml --show-failed-logs shell: micromamba-shell {0} diff --git a/deeprvat/preprocessing/preprocess.py b/deeprvat/preprocessing/preprocess.py index c3d4a0f8..46eba386 100644 --- a/deeprvat/preprocessing/preprocess.py +++ b/deeprvat/preprocessing/preprocess.py @@ -1,6 +1,5 @@ import gc import logging -import os import sys import time from pathlib import Path @@ -49,10 +48,10 @@ def process_sparse_gt_file( samples: List[str], calls_to_exclude: pd.DataFrame = None, ) -> Tuple[List[np.ndarray], List[np.ndarray]]: - sparse_gt = pd.read_csv( + sparse_gt = pd.read_table( file, names=["chrom", "pos", "ref", "alt", "sample", "gt"], - sep="\t", + engine="pyarrow", index_col=None, ) sparse_gt = sparse_gt[sparse_gt["sample"].isin(samples)] @@ -146,6 +145,18 @@ def add_variant_ids(variant_file: str, out_file: str, duplicates_file: str): ) +def get_file_chromosome(file, col_names, chrom_field="chrom"): + chrom_df = pd.read_csv( + file, names=col_names, sep="\t", index_col=None, nrows=1, usecols=[chrom_field] + ) + + chrom = None + if not chrom_df.empty: + chrom = chrom_df[chrom_field][0] + + return chrom + + @cli.command() @click.option("--exclude-variants", type=click.Path(exists=True), multiple=True) @click.option("--exclude-samples", type=click.Path(exists=True)) @@ -171,19 +182,24 @@ def process_sparse_gt( ): logging.info("Reading variants...") start_time = time.time() - variants = pd.read_csv(variant_file, sep="\t") + + variants = pd.read_table(variant_file, engine="pyarrow") + + # Filter all variants based on chromosome if chromosomes is not None: chromosomes = [f"chr{chrom}" for chrom in chromosomes.split(",")] variants = variants[variants["chrom"].isin(chromosomes)] total_variants = len(variants) + if len(exclude_variants) > 0: variant_exclusion_files = [ - v for directory in exclude_variants for v in Path(directory).glob("*.tsv*") + v for directory in exclude_variants for v in Path(directory).rglob("*.tsv*") ] + exclusion_file_cols = ["chrom", "pos", "ref", "alt"] variants_to_exclude = pd.concat( [ - pd.read_csv(v, sep="\t", names=["chrom", "pos", "ref", "alt"]) + pd.read_csv(v, sep="\t", names=exclusion_file_cols) for v in tqdm(variant_exclusion_files) ], ignore_index=True, @@ -212,7 +228,7 @@ def process_sparse_gt( if exclude_samples is not None: total_samples = len(samples) - if sample_exclusion_files := list(Path(exclude_samples).glob("*.csv")): + if sample_exclusion_files := list(Path(exclude_samples).rglob("*.csv")): samples_to_exclude = set( pd.concat( [ @@ -236,39 +252,51 @@ def process_sparse_gt( for chrom in tqdm(variant_groups.groups.keys()): logging.info(f"Processing chromosome {chrom}") logging.info("Reading in filtered calls") + + exclude_calls_file_cols = ["chrom", "pos", "ref", "alt", "sample"] + + calls_to_exclude = pd.DataFrame(columns=exclude_calls_file_cols) + if exclude_calls is not None: - chrom_dir = os.path.join(exclude_calls, chrom) - exclude_calls_chrom = Path(chrom_dir).glob("*.tsv*") - - calls_to_exclude = pd.concat( - [ - pd.read_csv( - c, - names=["chrom", "pos", "ref", "alt", "sample"], - sep="\t", - index_col=None, - ) - for c in tqdm(exclude_calls_chrom) - ], - ignore_index=True, - ) + exclude_calls_chrom = Path(exclude_calls).rglob("*.tsv*") + exclude_calls_chrom_files = [] + for exclude_call_file in exclude_calls_chrom: + file_chrom = get_file_chromosome( + exclude_call_file, col_names=exclude_calls_file_cols + ) - calls_dropped = len(calls_to_exclude) - total_calls_dropped += calls_dropped - logging.info(f"Dropped {calls_dropped} calls") - else: - calls_to_exclude = pd.DataFrame( - columns=["chrom", "pos", "ref", "alt", "sample"] - ) + if file_chrom == chrom: + exclude_calls_chrom_files.append(exclude_call_file) - logging.info("Processing sparse GT files") + if exclude_calls_chrom_files: + calls_to_exclude = pd.concat( + [ + pd.read_csv( + c, + names=exclude_calls_file_cols, + sep="\t", + index_col=None, + ) + for c in tqdm(exclude_calls_chrom_files) + ], + ignore_index=True, + ) + calls_dropped = len(calls_to_exclude) + total_calls_dropped += calls_dropped + logging.info(f"Dropped {calls_dropped} calls") - chrom_dir = os.path.join(sparse_gt, chrom) - logging.info(f"chrom dir is {chrom_dir}") + logging.info("Processing sparse GT files") variants_chrom = variant_groups.get_group(chrom) - sparse_gt_chrom = list(Path(chrom_dir).glob("*.tsv*")) + sparse_file_cols = ["chrom", "pos", "ref", "alt", "sample", "gt"] + + sparse_gt_chrom = [ + f + for f in Path(sparse_gt).rglob("*.tsv*") + if get_file_chromosome(f, col_names=sparse_file_cols) == chrom + ] + logging.info(f"sparse gt chrom(es) are: {sparse_gt_chrom}") processed = Parallel(n_jobs=threads, verbose=50)( diff --git a/docs/_static/preprocess_rulegraph_no_qc.svg b/docs/_static/preprocess_rulegraph_no_qc.svg new file mode 100644 index 00000000..3edc0a0e --- /dev/null +++ b/docs/_static/preprocess_rulegraph_no_qc.svg @@ -0,0 +1,169 @@ + + + + + + +snakemake_dag + + + +0 + +all + + + +1 + +combine_genotypes + + + +1->0 + + + + + +2 + +preprocess_no_qc + + + +2->1 + + + + + +3 + +add_variant_ids + + + +3->0 + + + + + +3->2 + + + + + +4 + +concatenate_variants + + + +4->3 + + + + + +9 + +create_parquet_variant_ids + + + +4->9 + + + + + +5 + +variants + + + +5->4 + + + + + +6 + +normalize + + + +6->5 + + + + + +10 + +sparsify + + + +6->10 + + + + + +7 + +extract_samples + + + +7->2 + + + + + +7->6 + + + + + +8 + +index_fasta + + + +8->6 + + + + + +9->0 + + + + + +9->2 + + + + + +10->2 + + + + + diff --git a/docs/_static/preprocess_rulegraph.svg b/docs/_static/preprocess_rulegraph_with_qc.svg similarity index 77% rename from docs/_static/preprocess_rulegraph.svg rename to docs/_static/preprocess_rulegraph_with_qc.svg index 22930d7a..5c3d86e5 100644 --- a/docs/_static/preprocess_rulegraph.svg +++ b/docs/_static/preprocess_rulegraph_with_qc.svg @@ -1,7 +1,7 @@ - 0 - + all 1 - + combine_genotypes - + 1->0 - + 2 - -preprocess + +preprocess_with_qc 2->1 - + 3 - + add_variant_ids - + 3->0 - + - + 3->2 - - + + 4 - + concatenate_variants 4->3 - + 9 - + create_parquet_variant_ids 4->9 - + 5 - + variants 5->4 - + 6 - + normalize 6->5 - + 10 - + sparsify 6->10 - + 11 - + qc_varmiss 6->11 - + 12 - + qc_hwe 6->12 - + 13 - + qc_read_depth 6->13 - + 14 - + qc_allelic_imbalance 6->14 - + 7 - + extract_samples - + 7->2 - - + + 7->6 - + 8 - + index_fasta 8->6 - + - + 9->0 - + - + 9->2 - - + + - + 10->2 - + - + 11->2 - + - + 12->2 - + - + 13->2 - + - + 14->2 - + 15 - + create_excluded_samples_dir - + 15->2 - - + + diff --git a/docs/preprocessing.md b/docs/preprocessing.md index cb90335b..6db444d8 100644 --- a/docs/preprocessing.md +++ b/docs/preprocessing.md @@ -3,7 +3,7 @@ The DeepRVAT preprocessing pipeline is based on [snakemake](https://snakemake.readthedocs.io/en/stable/) it uses [bcftools+samstools](https://www.htslib.org/) and a [python script](https://github.com/PMBio/deeprvat/blob/main/deeprvat/preprocessing/preprocess.py) preprocessing.py. -![DeepRVAT preprocessing pipeline](_static/preprocess_rulegraph.svg) +![DeepRVAT preprocessing pipeline](_static/preprocess_rulegraph_no_qc.svg) ## Output @@ -48,42 +48,41 @@ An example file is included in this repo: [example config](https://github.com/PM ```yaml # What chromosomes should be processed -included_chromosomes: [ 20,21,22 ] +included_chromosomes : [21,22] -# If you need to run a cmd to load bcf and samtools specify it here -bcftools_load_cmd: module load bcftools/1.10.2 && -samtools_load_cmd: module load samtools/1.9 && +# If you need to run a cmd to load bcf and samtools specify it here, see example +bcftools_load_cmd : # module load bcftools/1.10.2 && +samtools_load_cmd : # module load samtools/1.9 && # Path to where you want to write results and intermediate data -working_dir: /workdir +working_dir: workdir # Path to ukbb data -data_dir: /data +data_dir: data # These paths are all relative to the data dir -input_vcf_dir_name: vcf metadata_dir_name: metadata -# expected to be found in the data_dir / metadata_dir -pvcf_blocks_file: pvcf_blocks.txt - # These paths are all relative to the working dir # Here will the finished preprocessed files end up -preprocessed_dir_name: preprocesed +preprocessed_dir_name : preprocesed # Path to directory with fasta reference file -reference_dir_name: reference +reference_dir_name : reference # Here we will store normalized bcf files -norm_dir_name: norm +norm_dir_name : norm # Here we store "sparsified" bcf files -sparse_dir_name: sparse +sparse_dir_name : sparse # Expected to be found in working_dir/reference_dir -reference_fasta_file: GRCh38_full_analysis_set_plus_decoy_hla.fa +reference_fasta_file : GRCh38.primary_assembly.genome.fa # The format of the name of the "raw" vcf files -vcf_filename_pattern: ukb23156_c{chr}_b{block}_v1.vcf.gz +vcf_files_list: vcf_files_list.txt # Number of threads to use in the preprocessing script, separate from snakemake threads preprocess_threads: 16 + +# You can specify a different zcat cmd for example gzcat here, default zcat +zcat_cmd: gzcat ``` The config above would use the following directory structure: @@ -114,9 +113,31 @@ parent_directory ``` +### vcf_files_list +The `vcf_files_list` variable specifies the path to a text file that contains paths to the raw vcf files you want to +process. + +ex: + + +```text +data/vcf/test_vcf_data_c21_b1.vcf.gz +data/vcf/test_vcf_data_c22_b1.vcf.gz +``` + +The easiest way to create `vcf_files_list` (if you have your files in `data/vcf` under the `parent_directory`) +```shell +cd +find data/vcf -type f -name "*.vcf*" > vcf_files_list.txt +``` ## Running the preprocess pipeline -### Run the preprocess pipeline with example data +There are two versions of the pipeline, one with qc (quality control) and one without, the version with qc is the one +we used when we wrote the paper. The qc is specific to the UKBB data, so if you want/need to do your own qc use the +pipeline without qc. + +### Run the preprocess pipeline with example data and qc +![DeepRVAT preprocessing pipeline](_static/preprocess_rulegraph_with_qc.svg) *The vcf files in the example data folder was generated using [fake-vcf](https://github.com/endast/fake-vcf) (with some manual editing). @@ -144,7 +165,7 @@ gzip -d workdir/reference/GRCh38.primary_assembly.genome.fa.gz 4. Run with the example config ```shell -snakemake -j 1 --snakefile ../../pipelines/preprocess.snakefile --configfile ../../pipelines/config/deeprvat_preprocess_config.yaml +snakemake -j 1 --snakefile ../../pipelines/preprocess_with_qc.snakefile --configfile ../../pipelines/config/deeprvat_preprocess_config.yaml ``` 5. Enjoy the preprocessed data 🎉 @@ -157,10 +178,63 @@ total 48 -rw-r--r-- 1 user staff 6354 Aug 2 14:06 genotypes_chr22.h5 ``` -### Run on your own data + +### Run the preprocess pipeline with example data and no qc + +![DeepRVAT preprocessing pipeline](_static/preprocess_rulegraph_no_qc.svg) + +*The vcf files in the example data folder was generated using [fake-vcf](https://github.com/endast/fake-vcf) (with some +manual editing). +hence does not contain real data.* + +1. cd into the preprocessing example dir + +```shell +cd +cd example/preprocess +``` + +2. Download the fasta file + +```shell +wget https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_44/GRCh38.primary_assembly.genome.fa.gz -P workdir/reference +``` + +3. Unpack the fasta file + +```shell +gzip -d workdir/reference/GRCh38.primary_assembly.genome.fa.gz +``` + +4. Run with the example config + +```shell +snakemake -j 1 --snakefile ../../pipelines/preprocess_no_qc.snakefile --configfile ../../pipelines/config/deeprvat_preprocess_config.yaml +``` + +5. Enjoy the preprocessed data 🎉 + +```shell +ls -l workdir/preprocesed +total 48 +-rw-r--r-- 1 user staff 6404 Aug 2 14:06 genotypes.h5 +-rw-r--r-- 1 user staff 6354 Aug 2 14:06 genotypes_chr21.h5 +-rw-r--r-- 1 user staff 6354 Aug 2 14:06 genotypes_chr22.h5 +``` + +### Run on your own data with qc After configuration and activating the environment run the pipeline using snakemake: ```shell -snakemake -j --configfile config/deeprvat_preprocess_config.yaml -s preprocess.snakefile +snakemake -j --configfile config/deeprvat_preprocess_config.yaml -s preprocess_with_qc.snakefile ``` + + +### Run on your own data without qc + +After configuration and activating the environment run the pipeline using snakemake: + +```shell +snakemake -j --configfile config/deeprvat_preprocess_config.yaml -s preprocess_no_qc.snakefile +``` \ No newline at end of file diff --git a/example/preprocess/data/metadata/pvcf_blocks.txt b/example/preprocess/data/metadata/pvcf_blocks.txt deleted file mode 100644 index 0034a46a..00000000 --- a/example/preprocess/data/metadata/pvcf_blocks.txt +++ /dev/null @@ -1,3 +0,0 @@ -0 20 1 60114 64333662 -1 21 1 50338712 70338712 -2 22 1 110516173 150807862 diff --git a/example/preprocess/vcf_files_list.txt b/example/preprocess/vcf_files_list.txt new file mode 100644 index 00000000..6b196815 --- /dev/null +++ b/example/preprocess/vcf_files_list.txt @@ -0,0 +1,2 @@ +data/vcf/test_vcf_data_c21_b1.vcf.gz +data/vcf/test_vcf_data_c22_b1.vcf.gz diff --git a/pipelines/config/deeprvat_preprocess_config.yaml b/pipelines/config/deeprvat_preprocess_config.yaml index 8275d8cb..1a0173a9 100644 --- a/pipelines/config/deeprvat_preprocess_config.yaml +++ b/pipelines/config/deeprvat_preprocess_config.yaml @@ -11,12 +11,8 @@ working_dir: workdir data_dir: data # These paths are all relative to the data dir -input_vcf_dir_name : vcf metadata_dir_name: metadata -# expected to be found in the data_dir / metadata_dir -pvcf_blocks_file: pvcf_blocks.txt - # These paths are all relative to the working dir # Here will the finished preprocessed files end up preprocessed_dir_name : preprocesed @@ -31,9 +27,7 @@ sparse_dir_name : sparse reference_fasta_file : GRCh38.primary_assembly.genome.fa # The format of the name of the "raw" vcf files -vcf_filename_pattern: test_vcf_data_c{chr}_b{block} -# for ukbb data use this pattern: -#vcf_filename_pattern: ukb23156_c{chr}_b{block}_v1 +vcf_files_list: vcf_files_list.txt # Number of threads to use in the preprocessing script, separate from snakemake threads preprocess_threads: 16 diff --git a/pipelines/preprocess.snakefile b/pipelines/preprocess.snakefile deleted file mode 100644 index 6f93a866..00000000 --- a/pipelines/preprocess.snakefile +++ /dev/null @@ -1,305 +0,0 @@ -import pandas as pd -from pathlib import Path - - -configfile: "config/deeprvat_preprocess_config.yaml" - -load_samtools = config.get("samtools_load_cmd") or "" -load_bcftools = config.get("bcftools_load_cmd") or "" -zcat_cmd = config.get("zcat_cmd") or "zcat" - -preprocessing_cmd = "deeprvat_preprocess" - -working_dir = Path(config["working_dir"]) -data_dir = Path(config["data_dir"]) -preprocessed_dir = working_dir / config["preprocessed_dir_name"] -vcf_dir = data_dir / config["input_vcf_dir_name"] -metadata_dir = data_dir / config["metadata_dir_name"] -reference_dir = working_dir / config["reference_dir_name"] - -preprocess_threads = config["preprocess_threads"] - -fasta_file = reference_dir / config["reference_fasta_file"] -fasta_index_file = reference_dir / f"{config['reference_fasta_file']}.fai" - -norm_dir = working_dir / config["norm_dir_name"] -sparse_dir = norm_dir / config["sparse_dir_name"] -bcf_dir = norm_dir / "bcf" -norm_variants_dir = norm_dir / "variants" - -qc_dir = working_dir / "qc" -qc_indmiss_stats_dir = qc_dir / "indmiss/stats" -qc_indmiss_samples_dir = qc_dir / "indmiss/samples" -qc_indmiss_sites_dir = qc_dir / "indmiss/sites" -qc_varmiss_dir = qc_dir / "varmiss" -qc_hwe_dir = qc_dir / "hwe" -qc_read_depth_dir = qc_dir / "read_depth" -qc_allelic_imbalance_dir = qc_dir / "allelic_imbalance" -qc_duplicate_vars_dir = qc_dir / "duplicate_vars" -qc_filtered_samples_dir = qc_dir / "filtered_samples" - -vcf_filename_pattern = config["vcf_filename_pattern"] -vcf_files = vcf_dir / f"{vcf_filename_pattern}.vcf.gz" - -pvcf_blocks_df = pd.read_csv( - metadata_dir / config["pvcf_blocks_file"], - sep="\t", - header=None, - names=["Index", "Chromosome", "Block", "First position", "Last position"], - dtype={"Chromosome": str}, -).set_index("Index") - -# Filter out which chromosomes to work with -pvcf_blocks_df = pvcf_blocks_df[ - pvcf_blocks_df["Chromosome"].isin([str(c) for c in config["included_chromosomes"]]) -] - -chr_mapping = pd.Series( - [str(x) for x in range(1,23)] + ["X", "Y"],index=[str(x) for x in range(1,25)] -) -inv_chr_mapping = pd.Series( - [str(x) for x in range(1,25)],index=[str(x) for x in range(1,23)] + ["X", "Y"] -) - -pvcf_blocks_df["chr_name"] = chr_mapping.loc[pvcf_blocks_df["Chromosome"].values].values -chromosomes = pvcf_blocks_df["chr_name"] -block = pvcf_blocks_df["Block"] - - -rule all: - input: - preprocessed_dir / "genotypes.h5", - norm_variants_dir / "variants.tsv.gz", - variants=norm_variants_dir / "variants.parquet", - - -rule combine_genotypes: - input: - expand( - preprocessed_dir / "genotypes_chr{chr}.h5", - zip, - chr=chromosomes, - block=block, - ), - output: - preprocessed_dir / "genotypes.h5", - shell: - f"{preprocessing_cmd} combine-genotypes {{input}} {{output}}" - - -rule create_excluded_samples_dir: - output: - directory(qc_filtered_samples_dir), - shell: - "mkdir -p {output}" - - -rule preprocess: - input: - variants=norm_variants_dir / "variants.tsv.gz", - variants_parquet=norm_variants_dir / "variants.parquet", - samples=norm_dir / "samples_chr.csv", - sparse_tg=expand( - sparse_dir / "chr{chr}" / f"{vcf_filename_pattern}.tsv.gz", - zip, - chr=chromosomes, - block=block, - ), - qc_varmiss=expand( - qc_varmiss_dir / f"{vcf_filename_pattern}.tsv.gz", - zip, - chr=chromosomes, - block=block, - ), - qc_hwe=expand( - qc_hwe_dir / f"{vcf_filename_pattern}.tsv.gz", - zip, - chr=chromosomes, - block=block, - ), - qc_read_depth=expand( - qc_read_depth_dir / "chr{chr}" / f"{vcf_filename_pattern}.tsv.gz", - zip, - chr=chromosomes, - block=block, - ), - qc_allelic_imbalance=expand( - qc_allelic_imbalance_dir / f"{vcf_filename_pattern}.tsv.gz", - zip, - chr=chromosomes, - block=block, - ), - qc_filtered_samples=qc_filtered_samples_dir, - output: - expand(preprocessed_dir / "genotypes_chr{chr}.h5",chr=set(chromosomes)), - shell: - " ".join( - [ - f"{preprocessing_cmd}", - "process-sparse-gt", - f"--exclude-variants {qc_allelic_imbalance_dir}", - f"--exclude-variants {qc_hwe_dir}", - f"--exclude-variants {qc_varmiss_dir}", - f"--exclude-variants {qc_duplicate_vars_dir}", - f"--exclude-calls {qc_read_depth_dir}", - f"--exclude-samples {qc_filtered_samples_dir}", - "--chromosomes ", - ",".join(str(chr) for chr in set(chromosomes)), - f"--threads {preprocess_threads}", - "{input.variants}", - "{input.samples}", - f"{sparse_dir}", - f"{preprocessed_dir / 'genotypes'}", - ] - ) - - -rule all_qc: - input: - expand( - [ - qc_varmiss_dir / f"{vcf_filename_pattern}.tsv.gz", - qc_hwe_dir / f"{vcf_filename_pattern}.tsv.gz", - qc_read_depth_dir / "chr{chr}" / f"{vcf_filename_pattern}.tsv.gz", - qc_allelic_imbalance_dir / f"{vcf_filename_pattern}.tsv.gz", - ], - zip, - chr=chromosomes, - block=block, - ), - - -rule qc_varmiss: - input: - bcf_dir / "{vcf_filename_pattern}.bcf", - output: - qc_varmiss_dir / "{vcf_filename_pattern}.tsv.gz", - shell: - f'{load_bcftools} bcftools query --format "%CHROM\t%POS\t%REF\t%ALT\n" --include "F_MISSING >= 0.1" {{input}} | gzip > {{output}}' - - -rule qc_hwe: - input: - bcf_dir / "{vcf_filename_pattern}.bcf", - output: - qc_hwe_dir / "{vcf_filename_pattern}.tsv.gz", - shell: - f'{load_bcftools} bcftools +fill-tags --output-type u {{input}} -- --tags HWE | bcftools query --format "%CHROM\t%POS\t%REF\t%ALT\n" --include "INFO/HWE <= 1e-15" | gzip > {{output}}' - - -rule qc_read_depth: - input: - bcf_dir / f"{vcf_filename_pattern}.bcf", - output: - qc_read_depth_dir / "chr{chr}" / f"{vcf_filename_pattern}.tsv.gz", - shell: - f"""{load_bcftools} bcftools query --format '[%CHROM\\t%POS\\t%REF\\t%ALT\\t%SAMPLE\\n]' --include '(GT!="RR" & GT!="mis" & TYPE="snp" & FORMAT/DP < 7) | (GT!="RR" & GT!="mis" & TYPE="indel" & FORMAT/DP < 10)' {{input}} | gzip > {{output}}""" - - -rule qc_allelic_imbalance: - input: - bcf_dir / "{vcf_filename_pattern}.bcf", - output: - qc_allelic_imbalance_dir / "{vcf_filename_pattern}.tsv.gz", - shell: - f"""{load_bcftools} bcftools query --format '%CHROM\t%POS\t%REF\t%ALT\n' --exclude 'COUNT(GT="het")=0 || (GT="het" & ((TYPE="snp" & (FORMAT/AD[*:1] / FORMAT/AD[*:0]) > 0.15) | (TYPE="indel" & (FORMAT/AD[*:1] / FORMAT/AD[*:0]) > 0.20)))' {{input}} | gzip > {{output}}""" - - -rule all_preprocess: - input: - expand( - [ - bcf_dir / f"{vcf_filename_pattern}.bcf", - sparse_dir / "chr{chr}" / f"{vcf_filename_pattern}.tsv.gz", - norm_variants_dir / f"{vcf_filename_pattern}.tsv.gz", - ], - zip, - chr=chromosomes, - block=block, - ), - norm_variants_dir / "variants_no_id.tsv.gz", - norm_variants_dir / "variants.tsv.gz", - qc_duplicate_vars_dir / "duplicates.tsv", - - -rule normalize: - input: - vcf=vcf_files, - samplefile=norm_dir / "samples_chr.csv", - fasta=fasta_file, - fastaindex=fasta_index_file, - output: - bcf_dir / f"{vcf_filename_pattern}.bcf", - shell: - f"""{load_bcftools} bcftools view --samples-file {{input.samplefile}} --output-type u {{input.vcf}} | bcftools view --include 'COUNT(GT="alt") > 0' --output-type u | bcftools norm -m-both -f {{input.fasta}} --output-type b --output {{output}}""" - - -rule sparsify: - input: - bcf=bcf_dir / f"{vcf_filename_pattern}.bcf", - output: - tsv=sparse_dir / "chr{chr}" / f"{vcf_filename_pattern}.tsv.gz", - shell: - f"""{load_bcftools} bcftools query --format '[%CHROM\t%POS\t%REF\t%ALT\t%SAMPLE\t%GT\n]' --include 'GT!="RR" & GT!="mis"' {{input.bcf}} \ - | sed 's/0[/,|]1/1/; s/1[/,|]0/1/; s/1[/,|]1/2/; s/0[/,|]0/0/' | gzip > {{output.tsv}}""" - - -rule variants: - input: - bcf=bcf_dir / f"{vcf_filename_pattern}.bcf", - output: - norm_variants_dir / f"{vcf_filename_pattern}.tsv.gz", - shell: - f"{load_bcftools} bcftools query --format '%CHROM\t%POS\t%REF\t%ALT\n' {{input}} | gzip > {{output}}" - - -rule concatenate_variants: - input: - expand( - norm_variants_dir / f"{vcf_filename_pattern}.tsv.gz", - zip, - chr=chromosomes, - block=block, - ), - output: - norm_variants_dir / "variants_no_id.tsv.gz", - shell: - "{zcat_cmd} {input} | gzip > {output}" - - -rule add_variant_ids: - input: - norm_variants_dir / "variants_no_id.tsv.gz", - output: - variants=norm_variants_dir / "variants.tsv.gz", - duplicates=qc_duplicate_vars_dir / "duplicates.tsv", - shell: - f"{preprocessing_cmd} add-variant-ids {{input}} {{output.variants}} {{output.duplicates}}" - - -rule create_parquet_variant_ids: - input: - norm_variants_dir / "variants_no_id.tsv.gz", - output: - variants=norm_variants_dir / "variants.parquet", - duplicates=qc_duplicate_vars_dir / "duplicates.parquet", - shell: - f"{preprocessing_cmd} add-variant-ids {{input}} {{output.variants}} {{output.duplicates}}" - - -rule extract_samples: - input: - expand(vcf_files,zip,chr=chromosomes,block=block), - output: - norm_dir / "samples_chr.csv", - shell: - f"{load_bcftools} bcftools query --list-samples {{input}} > {{output}}" - - -rule index_fasta: - input: - fasta=fasta_file, - output: - fasta_index_file, - shell: - f"{load_samtools} samtools faidx {{input.fasta}}" diff --git a/pipelines/preprocess_no_qc.snakefile b/pipelines/preprocess_no_qc.snakefile new file mode 100644 index 00000000..a98c60d6 --- /dev/null +++ b/pipelines/preprocess_no_qc.snakefile @@ -0,0 +1,33 @@ +include: "preprocessing/preprocess.snakefile" + + +rule all: + input: + preprocessed_dir / "genotypes.h5", + norm_variants_dir / "variants.tsv.gz", + variants=norm_variants_dir / "variants.parquet", + + +rule preprocess_no_qc: + input: + variants=norm_variants_dir / "variants.tsv.gz", + variants_parquet=norm_variants_dir / "variants.parquet", + samples=norm_dir / "samples_chr.csv", + sparse_tg=expand(sparse_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems), + output: + expand(preprocessed_dir / "genotypes_chr{chr}.h5", chr=chromosomes), + shell: + " ".join( + [ + f"{preprocessing_cmd}", + "process-sparse-gt", + f"--exclude-variants {qc_duplicate_vars_dir}", + "--chromosomes ", + ",".join(str(chr) for chr in set(chromosomes)), + f"--threads {preprocess_threads}", + "{input.variants}", + "{input.samples}", + f"{sparse_dir}", + f"{preprocessed_dir / 'genotypes'}", + ] + ) diff --git a/pipelines/preprocess_with_qc.snakefile b/pipelines/preprocess_with_qc.snakefile new file mode 100644 index 00000000..f0d2c465 --- /dev/null +++ b/pipelines/preprocess_with_qc.snakefile @@ -0,0 +1,49 @@ + +include: "preprocessing/preprocess.snakefile" +include: "preprocessing/qc.snakefile" + + +rule all: + input: + preprocessed_dir / "genotypes.h5", + norm_variants_dir / "variants.tsv.gz", + variants=norm_variants_dir / "variants.parquet", + + +rule preprocess_with_qc: + input: + variants=norm_variants_dir / "variants.tsv.gz", + variants_parquet=norm_variants_dir / "variants.parquet", + samples=norm_dir / "samples_chr.csv", + sparse_tg=expand(sparse_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems), + qc_varmiss=expand(qc_varmiss_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems), + qc_hwe=expand(qc_hwe_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems), + qc_read_depth=expand( + qc_read_depth_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems + ), + qc_allelic_imbalance=expand( + qc_allelic_imbalance_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems + ), + qc_filtered_samples=qc_filtered_samples_dir, + output: + expand(preprocessed_dir / "genotypes_chr{chr}.h5", chr=chromosomes), + shell: + " ".join( + [ + f"{preprocessing_cmd}", + "process-sparse-gt", + f"--exclude-variants {qc_allelic_imbalance_dir}", + f"--exclude-variants {qc_hwe_dir}", + f"--exclude-variants {qc_varmiss_dir}", + f"--exclude-variants {qc_duplicate_vars_dir}", + f"--exclude-calls {qc_read_depth_dir}", + f"--exclude-samples {qc_filtered_samples_dir}", + "--chromosomes ", + ",".join(str(chr) for chr in set(chromosomes)), + f"--threads {preprocess_threads}", + "{input.variants}", + "{input.samples}", + f"{sparse_dir}", + f"{preprocessed_dir / 'genotypes'}", + ] + ) diff --git a/pipelines/preprocessing/preprocess.snakefile b/pipelines/preprocessing/preprocess.snakefile new file mode 100644 index 00000000..87320690 --- /dev/null +++ b/pipelines/preprocessing/preprocess.snakefile @@ -0,0 +1,140 @@ +from pathlib import Path + + +configfile: "config/deeprvat_preprocess_config.yaml" + + +load_samtools = config.get("samtools_load_cmd") or "" +load_bcftools = config.get("bcftools_load_cmd") or "" +zcat_cmd = config.get("zcat_cmd") or "zcat" + +preprocessing_cmd = "deeprvat_preprocess" + +working_dir = Path(config["working_dir"]) +data_dir = Path(config["data_dir"]) +preprocessed_dir = working_dir / config["preprocessed_dir_name"] +metadata_dir = data_dir / config["metadata_dir_name"] +reference_dir = working_dir / config["reference_dir_name"] + +preprocess_threads = config["preprocess_threads"] + +fasta_file = reference_dir / config["reference_fasta_file"] +fasta_index_file = reference_dir / f"{config['reference_fasta_file']}.fai" + +norm_dir = working_dir / config["norm_dir_name"] +sparse_dir = norm_dir / config["sparse_dir_name"] +bcf_dir = norm_dir / "bcf" +norm_variants_dir = norm_dir / "variants" + +qc_dir = working_dir / "qc" +qc_indmiss_stats_dir = qc_dir / "indmiss/stats" +qc_indmiss_samples_dir = qc_dir / "indmiss/samples" +qc_indmiss_sites_dir = qc_dir / "indmiss/sites" +qc_varmiss_dir = qc_dir / "varmiss" +qc_hwe_dir = qc_dir / "hwe" +qc_read_depth_dir = qc_dir / "read_depth" +qc_allelic_imbalance_dir = qc_dir / "allelic_imbalance" +qc_duplicate_vars_dir = qc_dir / "duplicate_vars" +qc_filtered_samples_dir = qc_dir / "filtered_samples" + + +with open(config["vcf_files_list"]) as file: + vcf_files = [Path(line.rstrip()) for line in file] + vcf_stems = [vf.stem.replace(".vcf", "") for vf in vcf_files] + + assert len(vcf_stems) == len(vcf_files) + + vcf_look_up = {stem: file for stem, file in zip(vcf_stems, vcf_files)} + +chromosomes = config["included_chromosomes"] + + +rule combine_genotypes: + input: + expand( + preprocessed_dir / "genotypes_chr{chr}.h5", + chr=chromosomes, + ), + output: + preprocessed_dir / "genotypes.h5", + shell: + f"{preprocessing_cmd} combine-genotypes {{input}} {{output}}" + + +rule normalize: + input: + samplefile=norm_dir / "samples_chr.csv", + fasta=fasta_file, + fastaindex=fasta_index_file, + params: + vcf_file=lambda wildcards: vcf_look_up[wildcards.vcf_stem], + output: + bcf_file=bcf_dir / "{vcf_stem}.bcf", + shell: + f"""{load_bcftools} bcftools view --samples-file {{input.samplefile}} --output-type u {{params.vcf_file}} | bcftools view --include 'COUNT(GT="alt") > 0' --output-type u | bcftools norm -m-both -f {{input.fasta}} --output-type b --output {{output.bcf_file}}""" + + +rule index_fasta: + input: + fasta=fasta_file, + output: + fasta_index_file, + shell: + f"{load_samtools} samtools faidx {{input.fasta}}" + + +rule sparsify: + input: + bcf=bcf_dir / "{vcf_stem}.bcf", + output: + tsv=sparse_dir / "{vcf_stem}.tsv.gz", + shell: + f"""{load_bcftools} bcftools query --format '[%CHROM\t%POS\t%REF\t%ALT\t%SAMPLE\t%GT\n]' --include 'GT!="RR" & GT!="mis"' {{input.bcf}} \ + | sed 's/0[/,|]1/1/; s/1[/,|]0/1/; s/1[/,|]1/2/; s/0[/,|]0/0/' | gzip > {{output.tsv}}""" + + +rule variants: + input: + bcf=bcf_dir / "{vcf_stem}.bcf", + output: + norm_variants_dir / "{vcf_stem}.tsv.gz", + shell: + f"{load_bcftools} bcftools query --format '%CHROM\t%POS\t%REF\t%ALT\n' {{input}} | gzip > {{output}}" + + +rule concatenate_variants: + input: + expand(norm_variants_dir / "{vcf_stem}.tsv.gz", vcf_stem=vcf_stems), + output: + norm_variants_dir / "variants_no_id.tsv.gz", + shell: + "{zcat_cmd} {input} | gzip > {output}" + + +rule add_variant_ids: + input: + norm_variants_dir / "variants_no_id.tsv.gz", + output: + variants=norm_variants_dir / "variants.tsv.gz", + duplicates=qc_duplicate_vars_dir / "duplicates.tsv", + shell: + f"{preprocessing_cmd} add-variant-ids {{input}} {{output.variants}} {{output.duplicates}}" + + +rule create_parquet_variant_ids: + input: + norm_variants_dir / "variants_no_id.tsv.gz", + output: + variants=norm_variants_dir / "variants.parquet", + duplicates=qc_duplicate_vars_dir / "duplicates.parquet", + shell: + f"{preprocessing_cmd} add-variant-ids {{input}} {{output.variants}} {{output.duplicates}}" + + +rule extract_samples: + input: + vcf_files, + output: + norm_dir / "samples_chr.csv", + shell: + f"{load_bcftools} bcftools query --list-samples {{input}} > {{output}}" diff --git a/pipelines/preprocessing/qc.snakefile b/pipelines/preprocessing/qc.snakefile new file mode 100644 index 00000000..fe7d3cc6 --- /dev/null +++ b/pipelines/preprocessing/qc.snakefile @@ -0,0 +1,43 @@ + + +rule qc_allelic_imbalance: + input: + bcf_dir / "{vcf_stem}.bcf", + output: + qc_allelic_imbalance_dir / "{vcf_stem}.tsv.gz", + shell: + f"""{load_bcftools} bcftools query --format '%CHROM\t%POS\t%REF\t%ALT\n' --exclude 'COUNT(GT="het")=0 || (GT="het" & ((TYPE="snp" & (FORMAT/AD[*:1] / FORMAT/AD[*:0]) > 0.15) | (TYPE="indel" & (FORMAT/AD[*:1] / FORMAT/AD[*:0]) > 0.20)))' {{input}} | gzip > {{output}}""" + + +rule qc_varmiss: + input: + bcf_dir / "{vcf_stem}.bcf", + output: + qc_varmiss_dir / "{vcf_stem}.tsv.gz", + shell: + f'{load_bcftools} bcftools query --format "%CHROM\t%POS\t%REF\t%ALT\n" --include "F_MISSING >= 0.1" {{input}} | gzip > {{output}}' + + +rule qc_hwe: + input: + bcf_dir / "{vcf_stem}.bcf", + output: + qc_hwe_dir / "{vcf_stem}.tsv.gz", + shell: + f'{load_bcftools} bcftools +fill-tags --output-type u {{input}} -- --tags HWE | bcftools query --format "%CHROM\t%POS\t%REF\t%ALT\n" --include "INFO/HWE <= 1e-15" | gzip > {{output}}' + + +rule qc_read_depth: + input: + bcf_dir / "{vcf_stem}.bcf", + output: + qc_read_depth_dir / "{vcf_stem}.tsv.gz", + shell: + f"""{load_bcftools} bcftools query --format '[%CHROM\\t%POS\\t%REF\\t%ALT\\t%SAMPLE\\n]' --include '(GT!="RR" & GT!="mis" & TYPE="snp" & FORMAT/DP < 7) | (GT!="RR" & GT!="mis" & TYPE="indel" & FORMAT/DP < 10)' {{input}} | gzip > {{output}}""" + + +rule create_excluded_samples_dir: + output: + directory(qc_filtered_samples_dir), + shell: + "mkdir -p {output}" diff --git a/tests/test_data/preprocessing/get_file_chr/chr1_sample.tsv b/tests/test_data/preprocessing/get_file_chr/chr1_sample.tsv new file mode 100644 index 00000000..5ab5ad6e --- /dev/null +++ b/tests/test_data/preprocessing/get_file_chr/chr1_sample.tsv @@ -0,0 +1,4 @@ +100103 chr1 T G 100103 1 +100103 chr1 T A 100096 1 +100103 chr1 T A 100103 1 +100103 chr1 T A 100104 1 diff --git a/tests/test_data/preprocessing/get_file_chr/chr2_sample.tsv b/tests/test_data/preprocessing/get_file_chr/chr2_sample.tsv new file mode 100644 index 00000000..e2e99ea8 --- /dev/null +++ b/tests/test_data/preprocessing/get_file_chr/chr2_sample.tsv @@ -0,0 +1,4 @@ +100103 chr2 T G 100103 1 +100103 chr2 T A 100096 1 +100103 chr2 T A 100103 1 +100103 chr2 T A 100104 1 diff --git a/tests/test_data/preprocessing/get_file_chr/no_chr_sample.tsv b/tests/test_data/preprocessing/get_file_chr/no_chr_sample.tsv new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_preprocess.py b/tests/test_preprocess.py index a150f279..626d17bd 100644 --- a/tests/test_preprocess.py +++ b/tests/test_preprocess.py @@ -2,6 +2,7 @@ import pandas as pd from pandas.testing import assert_frame_equal from deeprvat.preprocessing.preprocess import cli as preprocess_cli +import deeprvat.preprocessing.preprocess as deeprvat_preprocessing from click.testing import CliRunner from pathlib import Path import h5py @@ -309,3 +310,72 @@ def test_process_and_combine_sparse_gt( assert np.array_equal(written_variant_matrix, expected_data["variant_matrix"]) assert np.array_equal(written_genotype_matrix, expected_data["genotype_matrix"]) assert np.array_equal(written_samples, expected_data["samples"]) + + +@pytest.mark.parametrize( + "input_file_path, expected_chrom, col_names, chrom_field", + [ + ( + "add_variant_ids/add_variant_ids_tsv/input/variants_no_id.tsv.gz", + "chr1", + ["chrom", "id", "ref", "alt"], + "chrom", + ), + ( + "add_variant_ids/add_variant_ids_parquet/input/variants_no_id.tsv.gz", + "chr1", + ["chrom", "id", "ref", "alt"], + "chrom", + ), + ( + "process_and_combine_sparse_gt/no_filters_minimal_split/input/sparse_gt/chr1/input_c1_b1.tsv.gz", + "chr1", + ["chrom", "id", "ref", "alt", "variant", "stuff"], + "chrom", + ), + ( + "process_and_combine_sparse_gt/no_filters_minimal_split/input/sparse_gt/chr1/input_c1_b2.tsv.gz", + "chr1", + ["chrom", "id", "ref", "alt", "variant", "stuff"], + "chrom", + ), + ( + "process_and_combine_sparse_gt/no_filters_minimal_split/input/sparse_gt/chr2/input_c2_b1.tsv.gz", + "chr2", + ["chrom", "id", "ref", "alt", "variant", "stuff"], + "chrom", + ), + ( + "get_file_chr/chr1_sample.tsv", + "chr1", + ["id", "chrom", "ref", "alt", "variant", "stuff"], + "chrom", + ), + ( + "get_file_chr/chr1_sample.tsv", + "chr1", + ["id", "chromosome", "ref", "alt", "variant", "stuff"], + "chromosome", + ), + ( + "get_file_chr/chr2_sample.tsv", + "chr2", + ["id", "chrom", "ref", "alt", "variant", "stuff"], + "chrom", + ), + ( + "get_file_chr/no_chr_sample.tsv", + None, + ["id", "chrom", "ref", "alt", "variant", "stuff"], + "chrom", + ), + ], +) +def test_get_file_chromosome(input_file_path, expected_chrom, col_names, chrom_field): + chrom_file = tests_data_dir / input_file_path + + chrom = deeprvat_preprocessing.get_file_chromosome( + chrom_file, col_names=col_names, chrom_field=chrom_field + ) + + assert chrom == expected_chrom