Skip to content

Commit

Permalink
Make the preprocessing less ukbb specific (#39)
Browse files Browse the repository at this point in the history
* Create vcf_files_list.txt

* Delete pvcf_blocks.txt

* Update deeprvat_preprocess_config.yaml

* normalize works

* All but two working

* add preprocess rule

* Don't use chr dirs

* black

* Fix exclude calls filtering

* Use exclude_calls_file_cols

* Add combine genotype

* add get_file_chromosome test

* Add support for specifying chr field

* Remove vcf dir

* Add check for nr of files

* fix typo

* use stem

* Update preprocess.snakefile

* Update deeprvat_preprocess_config.yaml

* Refactor pipelines

* Preprocess with and without qc

* Update github-actions.yml

* Update deeprvat_preprocess_config.yaml

remove gzcat from example

* Remove exclude samples from no_qc pipeline

* Update preprocessing docs

* Update preprocessing.md

* snakefmt

* Skip job lib for test

* fixup! Format Python code with psf/black pull_request

* revert changes to paralell

* Use pyarrow for reading csvs

* Remoce joblib temporary

* Revert "Remoce joblib temporary"

This reverts commit f3cba5d.

---------

Co-authored-by: PMBio <PMBio@users.noreply.github.com>
  • Loading branch information
endast and PMBio authored Dec 8, 2023
1 parent 2fb87e6 commit c2ef088
Show file tree
Hide file tree
Showing 17 changed files with 782 additions and 431 deletions.
57 changes: 53 additions & 4 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v3
- name: Preprocessing Smoke Test
- name: Preprocessing Smoke Test With QC
uses: snakemake/snakemake-github-action@v1.24.0
with:
directory: 'example/preprocess'
snakefile: 'pipelines/preprocess.snakefile'
snakefile: 'pipelines/preprocess_with_qc.snakefile'
args: '-j 2 -n --configfile pipelines/config/deeprvat_preprocess_config.yaml'
stagein: 'touch example/preprocess/workdir/reference/GRCh38.primary_assembly.genome.fa'

- name: Preprocessing Smoke Test No QC
uses: snakemake/snakemake-github-action@v1.24.0
with:
directory: 'example/preprocess'
snakefile: 'pipelines/preprocess_no_qc.snakefile'
args: '-j 2 -n --configfile pipelines/config/deeprvat_preprocess_config.yaml'
stagein: 'touch example/preprocess/workdir/reference/GRCh38.primary_assembly.genome.fa'

Expand All @@ -98,7 +106,48 @@ jobs:
args: '-j 2 -n --configfile pipelines/config/deeprvat_annotation_config.yaml'


DeepRVAT-Preprocessing-Pipeline-Tests:
DeepRVAT-Preprocessing-Pipeline-Tests-No-QC:
runs-on: ubuntu-latest
needs: DeepRVAT-Preprocessing-Pipeline-Smoke-Tests
steps:

- name: Check out repository code
uses: actions/checkout@v3
- uses: mamba-org/setup-micromamba@v1.4.3
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml
cache-environment: true
cache-downloads: true

- name: Install DeepRVAT
run: pip install -e ${{ github.workspace }}
shell: micromamba-shell {0}

- name: Cache Fasta file
id: cache-fasta
uses: actions/cache@v3
with:
path: example/preprocess/workdir/reference
key: ${{ runner.os }}-reference-fasta

- name: Download and unpack fasta data
if: steps.cache-fasta.outputs.cache-hit != 'true'
run: |
cd ${{ github.workspace }}/example/preprocess && \
wget https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_44/GRCh38.primary_assembly.genome.fa.gz \
-O workdir/reference/GRCh38.primary_assembly.genome.fa.gz \
&& gzip -d workdir/reference/GRCh38.primary_assembly.genome.fa.gz
- name: Run preprocessing pipeline
run: |
python -m snakemake -j 2 --directory ${{ github.workspace }}/example/preprocess \
--snakefile ${{ github.workspace }}/pipelines/preprocess_no_qc.snakefile \
--configfile ${{ github.workspace }}/pipelines/config/deeprvat_preprocess_config.yaml --show-failed-logs
shell: micromamba-shell {0}


DeepRVAT-Preprocessing-Pipeline-Tests-With-QC:
runs-on: ubuntu-latest
needs: DeepRVAT-Preprocessing-Pipeline-Smoke-Tests
steps:
Expand Down Expand Up @@ -134,6 +183,6 @@ jobs:
- name: Run preprocessing pipeline
run: |
python -m snakemake -j 2 --directory ${{ github.workspace }}/example/preprocess \
--snakefile ${{ github.workspace }}/pipelines/preprocess.snakefile \
--snakefile ${{ github.workspace }}/pipelines/preprocess_with_qc.snakefile \
--configfile ${{ github.workspace }}/pipelines/config/deeprvat_preprocess_config.yaml --show-failed-logs
shell: micromamba-shell {0}
94 changes: 61 additions & 33 deletions deeprvat/preprocessing/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import gc
import logging
import os
import sys
import time
from pathlib import Path
Expand Down Expand Up @@ -49,10 +48,10 @@ def process_sparse_gt_file(
samples: List[str],
calls_to_exclude: pd.DataFrame = None,
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
sparse_gt = pd.read_csv(
sparse_gt = pd.read_table(
file,
names=["chrom", "pos", "ref", "alt", "sample", "gt"],
sep="\t",
engine="pyarrow",
index_col=None,
)
sparse_gt = sparse_gt[sparse_gt["sample"].isin(samples)]
Expand Down Expand Up @@ -146,6 +145,18 @@ def add_variant_ids(variant_file: str, out_file: str, duplicates_file: str):
)


def get_file_chromosome(file, col_names, chrom_field="chrom"):
chrom_df = pd.read_csv(
file, names=col_names, sep="\t", index_col=None, nrows=1, usecols=[chrom_field]
)

chrom = None
if not chrom_df.empty:
chrom = chrom_df[chrom_field][0]

return chrom


@cli.command()
@click.option("--exclude-variants", type=click.Path(exists=True), multiple=True)
@click.option("--exclude-samples", type=click.Path(exists=True))
Expand All @@ -171,19 +182,24 @@ def process_sparse_gt(
):
logging.info("Reading variants...")
start_time = time.time()
variants = pd.read_csv(variant_file, sep="\t")

variants = pd.read_table(variant_file, engine="pyarrow")

# Filter all variants based on chromosome
if chromosomes is not None:
chromosomes = [f"chr{chrom}" for chrom in chromosomes.split(",")]
variants = variants[variants["chrom"].isin(chromosomes)]
total_variants = len(variants)

if len(exclude_variants) > 0:
variant_exclusion_files = [
v for directory in exclude_variants for v in Path(directory).glob("*.tsv*")
v for directory in exclude_variants for v in Path(directory).rglob("*.tsv*")
]

exclusion_file_cols = ["chrom", "pos", "ref", "alt"]
variants_to_exclude = pd.concat(
[
pd.read_csv(v, sep="\t", names=["chrom", "pos", "ref", "alt"])
pd.read_csv(v, sep="\t", names=exclusion_file_cols)
for v in tqdm(variant_exclusion_files)
],
ignore_index=True,
Expand Down Expand Up @@ -212,7 +228,7 @@ def process_sparse_gt(
if exclude_samples is not None:
total_samples = len(samples)

if sample_exclusion_files := list(Path(exclude_samples).glob("*.csv")):
if sample_exclusion_files := list(Path(exclude_samples).rglob("*.csv")):
samples_to_exclude = set(
pd.concat(
[
Expand All @@ -236,39 +252,51 @@ def process_sparse_gt(
for chrom in tqdm(variant_groups.groups.keys()):
logging.info(f"Processing chromosome {chrom}")
logging.info("Reading in filtered calls")

exclude_calls_file_cols = ["chrom", "pos", "ref", "alt", "sample"]

calls_to_exclude = pd.DataFrame(columns=exclude_calls_file_cols)

if exclude_calls is not None:
chrom_dir = os.path.join(exclude_calls, chrom)
exclude_calls_chrom = Path(chrom_dir).glob("*.tsv*")

calls_to_exclude = pd.concat(
[
pd.read_csv(
c,
names=["chrom", "pos", "ref", "alt", "sample"],
sep="\t",
index_col=None,
)
for c in tqdm(exclude_calls_chrom)
],
ignore_index=True,
)
exclude_calls_chrom = Path(exclude_calls).rglob("*.tsv*")
exclude_calls_chrom_files = []
for exclude_call_file in exclude_calls_chrom:
file_chrom = get_file_chromosome(
exclude_call_file, col_names=exclude_calls_file_cols
)

calls_dropped = len(calls_to_exclude)
total_calls_dropped += calls_dropped
logging.info(f"Dropped {calls_dropped} calls")
else:
calls_to_exclude = pd.DataFrame(
columns=["chrom", "pos", "ref", "alt", "sample"]
)
if file_chrom == chrom:
exclude_calls_chrom_files.append(exclude_call_file)

logging.info("Processing sparse GT files")
if exclude_calls_chrom_files:
calls_to_exclude = pd.concat(
[
pd.read_csv(
c,
names=exclude_calls_file_cols,
sep="\t",
index_col=None,
)
for c in tqdm(exclude_calls_chrom_files)
],
ignore_index=True,
)
calls_dropped = len(calls_to_exclude)
total_calls_dropped += calls_dropped
logging.info(f"Dropped {calls_dropped} calls")

chrom_dir = os.path.join(sparse_gt, chrom)
logging.info(f"chrom dir is {chrom_dir}")
logging.info("Processing sparse GT files")

variants_chrom = variant_groups.get_group(chrom)

sparse_gt_chrom = list(Path(chrom_dir).glob("*.tsv*"))
sparse_file_cols = ["chrom", "pos", "ref", "alt", "sample", "gt"]

sparse_gt_chrom = [
f
for f in Path(sparse_gt).rglob("*.tsv*")
if get_file_chromosome(f, col_names=sparse_file_cols) == chrom
]

logging.info(f"sparse gt chrom(es) are: {sparse_gt_chrom}")

processed = Parallel(n_jobs=threads, verbose=50)(
Expand Down
Loading

0 comments on commit c2ef088

Please sign in to comment.