diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml
index 65ad7bbb..3708d48b 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -8,7 +8,7 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-gh-action
environment-file: ${{ github.workspace }}/deeprvat_env_no_gpu.yml
@@ -42,7 +42,7 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-gh-action
environment-file: ${{ github.workspace }}/deeprvat_env_no_gpu.yml
@@ -83,7 +83,7 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml
@@ -116,7 +116,7 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml
@@ -136,7 +136,7 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml
@@ -177,7 +177,7 @@ jobs:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml
diff --git a/.github/workflows/test-runner.yml b/.github/workflows/test-runner.yml
index 02b59ac0..32e33474 100644
--- a/.github/workflows/test-runner.yml
+++ b/.github/workflows/test-runner.yml
@@ -8,7 +8,8 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_env_no_gpu.yml
@@ -19,16 +20,16 @@ jobs:
run: pip install -e ${{ github.workspace }}
shell: micromamba-shell {0}
- name: Run pytest deeprvat
- run: pytest -v ${{ github.workspace }}/tests/deeprvat
+ run: pytest -n auto -v ${{ github.workspace }}/tests/deeprvat
shell: micromamba-shell {0}
DeepRVAT-Tests-Runner-Preprocessing:
runs-on: ubuntu-latest
steps:
-
- name: Check out repository code
uses: actions/checkout@v4
- - uses: mamba-org/setup-micromamba@v1.8.0
+
+ - uses: mamba-org/setup-micromamba@v1.8.1
with:
environment-name: deeprvat-preprocess-gh-action
environment-file: ${{ github.workspace }}/deeprvat_preprocessing_env.yml
@@ -42,3 +43,23 @@ jobs:
- name: Run pytest preprocessing
run: pytest -v ${{ github.workspace }}/tests/preprocessing
shell: micromamba-shell {0}
+
+ DeepRVAT-Tests-Runner-Annotations:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Check out repository code
+ uses: actions/checkout@v4
+ - uses: mamba-org/setup-micromamba@v1.8.1
+ with:
+ environment-name: deeprvat-annotation-gh-action
+ environment-file: ${{ github.workspace }}/deeprvat_annotations.yml
+ cache-environment: true
+ cache-downloads: true
+
+ - name: Install DeepRVAT
+ run: pip install -e ${{ github.workspace }}
+ shell: micromamba-shell {0}
+
+ - name: Run pytest annotations
+ run: pytest -n auto -v ${{ github.workspace }}/tests/annotations
+ shell: micromamba-shell {0}
diff --git a/.readthedocs.yaml b/.readthedocs.yaml
index f9221ef3..4e3bcd48 100644
--- a/.readthedocs.yaml
+++ b/.readthedocs.yaml
@@ -7,7 +7,7 @@ version: 2
# Set the OS, Python version and other tools you might need
build:
- os: ubuntu-22.04
+ os: ubuntu-lts-latest
tools:
python: "3.12"
diff --git a/TODOS.md b/TODOS.md
deleted file mode 100644
index 34c92d66..00000000
--- a/TODOS.md
+++ /dev/null
@@ -1,114 +0,0 @@
-# DeepRVAT
-
-Rare variant association testing using deep learning and data-driven burden scores
-
-
-## Execution flow
-
-* [x] `preprocess.snakefile` (`preprocess.snakemake`) -> preprocess.py
- * `index_fasta`
- * `extract_samples`
- * `variants`
- * `concatinate_variants`
- * `add_variant_ids`
- * `create_parquet_variant_ids`
- * `normalize`
- * `sparsify`
- * `qc_allelic_imbalance`
- * `qc_read_depth`
- * `qc_hwe`
- * `qc_varmiss`
- * `qc_indmiss`
- * `preprocess`
- * `combine_genotype`
-* [x] `preprocess.py`
- * `add_variant_ids`
- * `process_sparse_gt`
- * `combine_gemotype`
-
-* [ ] Annotation pipeline - **Marcel**
- * annotation.snakefile
-* [x] `baseline.smk` - **Eva**
- * [x] `baseline_scoretest_smk.py` - **Eva**
- * `update-config`
- * `make-dataset`
- * `test-evaluate`
- * `combine-results`
- * [x] `evaluate_baseline.py` - **Eva**
-* [x] `train_multipheno.snakefile` - **Brian**
- * [x] `train_associate.py`
- * [x] rename to `config.py`
- * `update-config`
- * `make-dataset`
- * [x] Move to `train.py`
- * [x] `train.py` - **Brian**
- * `train-bagging`
- * [x] `pl-models.py` - **Felix**
-* [x] `associate_multipheno.snakefile` - **Brian**
- * `train.py`
- * `best-bagging-run`
- * `pl_models.py`
- * [x] `data/__init__.py`
- * [x] `data/dense_gt.py` - **Brian**
- * [x] `data/rare.py` - **Brian**
- * [x] `metrics.py`
- * [x] `utils.py`
- * [x] `univariate_burden_regression.py`
- * `make-dataset`
- * `compute-burdens`
- * `regress`
- * `combine-regression-results`
- * [x] Remove all logic for splitting training and replication - **Brian**
- * [x] `evaluate_new.py` - **Brian**
- * [x] Rename
- * [x] Remove testing/replication split
- * [x] `compare_discoveries.py` - **Brian**
- * [x] Merge into `evaluate_new.py`
-
-
-## Cleanup
-
-For each file:
-* Remove orphaned functions/rules
- 1. Remove rules not needed in pipelines
- 1. Remove click commands not used in any pipeline
- 1. Remove functions not used in any Click command
-* Remove unused imports, global variables, command/function arguments
-* Remove all comments that are not explanations of code
-* Remove `resources` from snakefiles
-* Convert `genopheno` imports to corresponding `deeprvat` imports
-* Reformat files with black/snakefmt
-
-## TODO
-
-* Usage with pretrained models:
- * [ ] Create an association testing pipeline
- * [x] Upload pretrained models
-* Documentation
- * [ ] Installation
- * [ ] Basic usage
- * [ ] More detailed docs
-* [ ] Acknowledge code used from SEAK
-* [x] Merge training/association snakefiles - **Brian**
- * [x] Create entry points
-* [x] Create a minimal conda env with all required packages
-* [x] Create example files showing the input data format - **Brian**
- * `annotations.parquet`
- * `protein_coding_genes.parquet`
- * `genotypes.h5`
- * `variants.parquet`
- * `config.yaml`
- * add `do_scoretest: True`
-* [ ] Run an example pipeline to make sure everything still works
-* [x] Replace `py = ...` in `train_associate.snakefile` with placeholder (user should enter path to their clone of `deeprvat` repo) - **Brian**
- * Maybe even add command-line entry points?
-
-Lower priority:
-* Really clean up files to remove features that aren't used, esp.:
- * `univariate_burden_regression.py`
- * Remove testing/replication split
- * `data/dense_gt.py`
-* Improve `update_config` function to help simplify config files (i.e., give the minimal config and generate everything else that's needed for `hpopt_config.yaml`)
-* Refactor certain files, e.g.
- * `data/rare.py`
-* Resolve any TODOs commented in the files
diff --git a/deeprvat/annotations/annotations.py b/deeprvat/annotations/annotations.py
index 91d0b1ba..2ca4f2f8 100644
--- a/deeprvat/annotations/annotations.py
+++ b/deeprvat/annotations/annotations.py
@@ -1,12 +1,14 @@
import logging
import os
+
+os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1"
import pickle
import random
import sys
import time
from pathlib import Path
from typing import Optional
-
+import dask.dataframe as dd
import numpy as np
import click
import keras.backend as K
@@ -16,10 +18,33 @@
from joblib import Parallel, delayed
from keras.models import load_model
from sklearn.decomposition import PCA
-from tqdm import tqdm
+from tqdm import tqdm, trange
+from fastparquet import ParquetFile
+import yaml
def precision(y_true, y_pred):
+ """
+ Calculate precision, a metric for the accuracy of the positive predictions.
+
+ Precision is defined as the the fraction of relevant instances among the retrieved instances.
+
+ Parameters:
+ - y_true (Tensor): The true labels (ground truth).
+ - y_pred (Tensor): The predicted labels.
+
+ Returns:
+ float: Precision value.
+
+ Notes:
+ - This function uses the Keras backend functions to perform calculations.
+ - Precision is calculated as `true_positives / (predicted_positives + epsilon)`, where epsilon is a small constant to avoid division by zero.
+
+
+ References:
+ - https://en.wikipedia.org/wiki/Precision_and_recall
+
+ """
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
@@ -28,6 +53,26 @@ def precision(y_true, y_pred):
def recall(y_true, y_pred):
+ """
+ Calculate recall, a metric for the ability to capture true positive instances.
+
+ Recall is defined as the fraction of relevant instances that were retrieved.
+
+ Parameters:
+ - y_true (Tensor): The true labels (ground truth).
+ - y_pred (Tensor): The predicted labels.
+
+ Returns:
+ - float: Recall value.
+
+ Notes:
+ - This function uses the Keras backend functions to perform calculations.
+ - Recall is calculated as `true_positives / (possible_positives + epsilon)`, where epsilon is a small constant to avoid division by zero.
+
+
+ References:
+ - https://en.wikipedia.org/wiki/Precision_and_recall
+ """
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
@@ -36,6 +81,28 @@ def recall(y_true, y_pred):
def deepripe_get_model_info(saved_models_dict, saved_deepripe_models_path):
+ """
+ Retrieve information about the paths and names of saved deepRiPe models.
+
+ Parameters:
+ - saved_models_dict (dict): A dictionary containing keys for different types of models. Keys include "parclip" for PAR-CLIP models, "eclip_hg2" for eCLIP models in HepG2, and "eclip_k5" for eCLIP models in K562. Values are model identifiers.
+ - saved_deepripe_models_path (str): The path to the directory where the deepRiPe models are saved.
+
+ Returns:
+ tuple: A tuple containing two dictionaries.
+ The first dictionary contains paths for each type of model, with keys
+ "parclip", "eclip_hg2", and "eclip_k5" and values as lists of paths corresponding to high,
+ medium, and low sequence models.
+ The second dictionary contains lists of RBP names for each type of model, with keys
+ "parclip", "eclip_hg2", and "eclip_k5" and values as lists of RBP names for high, medium, and
+ low sequence models.
+
+ Notes:
+ - The function constructs file paths based on the provided model identifiers.
+ - The resulting dictionary structure allows easy access to model paths for different types.
+
+
+ """
shared_path = Path(saved_deepripe_models_path)
# parclip
@@ -106,7 +173,7 @@ def deepripe_get_model_info(saved_models_dict, saved_deepripe_models_path):
"G45",
"XPO5",
]
- )
+ ) # 27
pc_RBPnames_med = np.array(
[
@@ -358,7 +425,27 @@ def deepripe_get_model_info(saved_models_dict, saved_deepripe_models_path):
def seq_to_1hot(seq, randomsel=True):
- "converts the sequence to one-hot encoding"
+ """
+ Convert a nucleotide sequence to one-hot encoding.
+
+ Parameters:
+ - seq (str): The input nucleotide sequence.
+ - randomsel (bool): If True, treat ambiguous base as random base.
+ If False, return only zero rows for ambiguous case.
+
+ Returns:
+ numpy.ndarray: A 2D array representing the one-hot encoding of the input sequence.
+ Rows correspond to nucleotides 'A', 'C', 'G', 'T' in that order.
+ Columns correspond to positions in the input sequence.
+
+ Notes:
+ - Ambiguous bases are handled based on the 'randomsel' parameter.
+
+
+
+ References:
+ - one-hot encoding: https://en.wikipedia.org/wiki/One-hot
+ """
seq_len = len(seq)
seq = seq.upper()
@@ -380,14 +467,29 @@ def seq_to_1hot(seq, randomsel=True):
def convert2bed(variants_file, output_dir):
+ """
+ Convert a variants file to BED format.
+
+ Parameters:
+ - variants_file (str): The path to the input variants file.
+ - output_dir (str): The directory where the BED file will be saved.
+
+ Returns:
+ None
+
+ Notes:
+ - The input variants file should be in tab-separated format with columns: "#CHROM", "POS", "ID", "REF", "ALT".
+ - The generated BED file will have columns: "CHR", "Start", "End", "ID", "VAR", "Strand".
+ - The "Start" and "End" columns are set to the "POS" values, and "Strand" is set to '.' for all entries.
+ """
file_name = variants_file.split("/")[-1]
- logger.info(f"Generating BED file: {output_dir}/{file_name[:-3]}bed")
+ print(f"Generating BED file: {output_dir}/{file_name[:-3]}bed")
df_variants = pd.read_csv(
variants_file, sep="\t", names=["#CHROM", "POS", "ID", "REF", "ALT"]
- ) # hg38
+ )
- logger.debug(df_variants.head())
+ print(df_variants.head())
df_bed = pd.DataFrame()
df_bed["CHR"] = df_variants["#CHROM"].astype(str)
@@ -403,6 +505,27 @@ def convert2bed(variants_file, output_dir):
def deepripe_encode_variant_bedline(bedline, genomefasta, flank_size=75):
+ """
+ Encode a variant bedline into one-hot encoded sequences.
+
+ Parameters:
+ - bedline (list): A list representing a variant bedline, containing elements for chromosome, start position, end position, reference allele, alternate allele, and strand.
+ - genomefasta (str): The path to the genome FASTA file for sequence retrieval.
+ - flank_size (int): The size of flanking regions to include in the sequence around the variant position.
+
+ Returns:
+ numpy.ndarray: A 3D array representing one-hot encoded sequences. The dimensions are (num_sequences, sequence_length, nucleotide_channels).
+
+ Notes:
+ - The input bedline should follow the format: [chromosome, start position, end position, reference allele, alternate allele, strand].
+ - The function retrieves the wild-type and mutant sequences flanked by the specified size.
+ - The wild-type sequence is extracted from the genome FASTA file and mutated at the variant position.
+ - The resulting sequences are one-hot encoded and returned as a numpy array.
+
+ References:
+ - pybedtools.BedTool: https://daler.github.io/pybedtools/main.html
+ - FATSA format: https://en.wikipedia.org/wiki/FASTA_format
+ """
mut_a = bedline[4].split("/")[1]
strand = bedline[5]
if len(mut_a) == 1:
@@ -440,39 +563,22 @@ def deepripe_encode_variant_bedline(bedline, genomefasta, flank_size=75):
return encoded_seqs
-def deepripe_score_variant_onlyseq_all(
- model_group, variant_bed, genomefasta, seq_len=200, n_jobs=32
-):
- predictions = {}
- encoded_seqs_list = Parallel(n_jobs=n_jobs, verbose=10)(
- delayed(deepripe_encode_variant_bedline)(
- bedline, genomefasta, flank_size=(seq_len // 2) + 2
- )
- for bedline in variant_bed
- )
- encoded_seqs_list = [
- (x if x is not None else np.ones((2, seq_len + 4, 4)) * float("nan"))
- for x in encoded_seqs_list
- ]
- encoded_seqs = tf.concat(encoded_seqs_list, 0)
+def readYamlColumns(annotation_columns_yaml_file):
+ with open(annotation_columns_yaml_file, "r") as fd:
+ config = yaml.safe_load(fd)
+ columns = config["annotation_column_names"]
+ prior_names = list(columns.keys())
+ post_names = [list(columns[k].keys())[0] for k in columns]
+ fill_vals = [list(columns[k].values())[0] for k in columns]
+ column_name_mapping = dict(zip(prior_names, post_names))
+ fill_value_mapping = dict(zip(post_names, fill_vals))
+ return prior_names, post_names, fill_vals, column_name_mapping, fill_value_mapping
- logger.info("Computing predictions")
- ## shifting around (seq_len+4) 4 bases
- for choice in tqdm(model_group.keys(), desc="Model group"):
- avg_score = 0.0
- for i in range(4):
- cropped_seqs = encoded_seqs[:, i : i + seq_len, :]
- model, _ = model_group[choice]
- pred = model.predict_on_batch(cropped_seqs)
- wild_indices = tf.range(pred.shape[0], delta=2)
- mut_indices = tf.range(1, pred.shape[0], delta=2)
- pred_wild = pred[wild_indices, :]
- pred_mut = pred[mut_indices, :]
- score = pred_mut - pred_wild
- avg_score += score
- predictions[choice] = avg_score / 4
- return predictions
+def get_parquet_columns(parquet_file):
+ pfile = ParquetFile(parquet_file)
+ pcols = pfile.columns
+ return pcols
@click.group()
@@ -481,22 +587,177 @@ def cli():
@cli.command()
-@click.option("--n-components", type=int, default=100)
+@click.argument("anno_path", type=click.Path(exists=True))
+@click.argument("gtf_path", type=click.Path(exists=True))
+@click.argument("genes_path", type=click.Path(exists=True))
+@click.argument("output_path", type=click.Path(exists=False))
+@click.option("--max_dist", type=int, default=300)
+def filter_annotations_by_exon_distance(
+ anno_path: str, gtf_path: str, genes_path: str, output_path: str, max_dist: int
+) -> None:
+ """
+ Filters annotation based on distance to the nearest exon of gene it is associated with.
+
+ Args:
+ anno_path (str): Annotation parquet file containing variant annotations to filter.
+ gtf_path (str): GTF file containing start and end positions of all relevant exons of all relevant genes. DataFrame is filtered for protein coding exons.
+ genes_path (str): List of protein coding genes and their IDs in the annotation DataFrame.
+ output_path (str): Where to write the resulting parquet file.
+ max_dist (int): Base pairs used to filter.
+
+ Returns:
+ None
+
+ Writes:
+ Parquet file containing filtered annotations.
+ """
+ import pyranges as pr
+
+ logger.info("read gtf file as pandas df")
+ gtf = pr.read_gtf(gtf_path)
+ gtf = gtf.as_df()
+
+ logger.info("filter gtf for protein coding exons from the HAVANA DB")
+ gtf = gtf.query(
+ "Source == 'HAVANA' and Feature == 'exon' and gene_type == 'protein_coding' and transcript_type == 'protein_coding'"
+ )
+
+ logger.info("split gene ID column on '.'")
+ gtf[["gene_base", "feature"]] = gtf["gene_id"].str.split(".", expand=True)
+
+ logger.info(" read protein_coding_genes")
+ pcg = pd.read_parquet(genes_path, columns=["gene", "id"])
+
+ logger.info(" only select necessary columns, rename to fit gtf file")
+ gtf = gtf[["gene_id", "Start", "End"]].rename(columns={"gene_id": "gene"})
+
+ logger.info(" add gene ids to gtf file")
+
+ gtf = gtf.merge(pcg, on="gene")
+
+ logger.info(" only select necessary columns, rename to fit gtf file")
+ gtf = gtf[["Start", "End", "id"]].rename(columns={"id": "gene_id"})
+
+ logger.info("reading annotations to filter ")
+ anno_df = pd.read_parquet(anno_path)
+ anno_df = anno_df[["id", "pos", "gene_id"]]
+
+ logger.info("adding exons to annotations (1:M merge)")
+
+ merged = anno_df.merge(gtf, how="left", on="gene_id")
+ del anno_df
+
+ logger.info(
+ "adding positons of start and end of each exon relative to variant position to df"
+ )
+ merged["start_diff"] = merged["Start"] - merged["pos"]
+ merged["end_diff"] = merged["End"] - merged["pos"]
+
+ logger.info(
+ f"filtering all rows that are further than {max_dist}bp away from each exon "
+ )
+ len_bf_filtering = len(merged)
+ filtered_merge = merged.query(
+ "(start_diff <= 0 & end_diff >= 0) | abs(start_diff) <= @max_dist | abs(end_diff) <= @max_dist"
+ )
+ del merged
+ len_after_filtering = len(filtered_merge)
+ logger.info(
+ f"filtered rows by exon distance ({max_dist}bp), dropped({len_bf_filtering - len_after_filtering} rows / {np.round(100*(len_bf_filtering - len_after_filtering)/len_bf_filtering)}%)"
+ )
+
+ logger.info("select necessary columns, drop duplicates")
+ filtered_merge = filtered_merge[["id", "gene_id"]]
+ filtered_merge = filtered_merge.drop_duplicates()
+ logger.info(
+ f"dropped dublicates in data frame (dropped {len_after_filtering - len(filtered_merge)}rows/ {np.round(100*(len_after_filtering - len(filtered_merge))/len_after_filtering)}%)."
+ )
+
+ logger.info("Reading in annotations for filtering")
+ anno_df = pd.read_parquet(anno_path)
+ len_anno = len(anno_df)
+ filtered = anno_df.merge(filtered_merge, on=["id", "gene_id"], how="left")
+
+ logger.info(
+ f"filtered annotations based on filterd id, gene_id (dropped {len(anno_df) - len(filtered)} / {np.round(100*(len(anno_df)-len(filtered))/len(anno_df))}% of rows)."
+ )
+ logger.info("performing sanity check")
+ assert len(filtered == len_anno)
+ logger.info(f"writing result to {output_path}")
+ filtered.to_parquet(output_path)
+
+
+@cli.command()
@click.argument("deepsea-file", type=click.Path(exists=True))
@click.argument("pca-object", type=click.Path())
+@click.argument("means_sd_df", type=click.Path())
@click.argument("out-dir", type=click.Path(exists=True))
-def deepsea_pca(n_components: int, deepsea_file: str, pca_object: str, out_dir: str):
+@click.option("--n-components", type=int, default=100)
+def deepsea_pca(
+ deepsea_file: str,
+ pca_object: str,
+ means_sd_df: str,
+ out_dir: str,
+ n_components: int,
+):
+ """
+ Perform Principal Component Analysis (PCA) on DeepSEA data and save the results.
+
+ Parameters:
+ - n_components (int): Number of principal components to retain, default is 100.
+ - deepsea_file (str): Path to the DeepSEA data in parquet format.
+ - pca_object (str): Path to save or load the PCA object (components) in npy or pickle format.
+ - means_sd_df (str): Path to a DataFrame containing pre-calculated means and SDs for standardization. If path does not exist, standardization will be done using the calculated mean and SD, result will then be saved under this path
+ - out_dir (str): Path to the output directory where the PCA results will be saved.
+
+ Returns:
+ None
+
+ Raises:
+ AssertionError: If there are NaN values in the PCA results DataFrame.
+
+ Notes:
+ - If 'means_sd_df' is provided, the data will be standardized using the existing mean and SD. Otherwise, the data will be standardized using the mean and SD calculated from the data.
+ - If 'pca_object' exists, it will be loaded as a PCA object. If it doesn't exist, a new PCA object will be created, and its components will be saved to 'pca_object'.
+
+ Example:
+ $ python annotations.py deepsea_pca --n-components 50 deepsea_data.parquet pca_components.npy means_sd.parquet results/
+ """
logger.info("Loading deepSea data")
- df = pd.read_csv(deepsea_file)
+ df = pd.read_parquet(deepsea_file)
logger.info("filling NAs")
df = df.fillna(0)
logger.info("Extracting matrix for PCA")
- key_df = df[["chrom", "pos", "ref", "alt", "id"]].reset_index(drop=True)
+ key_df = df[["#CHROM", "POS", "REF", "ALT"]].reset_index(drop=True)
logger.info("transforming values to numpy")
- X = df[[c for c in df.columns if c.startswith("DeepSEA")]].to_numpy()
+ deepSEAcols = [c for c in df.columns if c.startswith("DeepSEA")]
+ X = df[deepSEAcols].to_numpy()
del df
- logger.info("standardizing values")
- X_std = (X - np.mean(X, axis=0)) / np.std(X, axis=0)
+ logger.info(
+ "checking wether input contains data frame with pre-calculated means and SDs"
+ )
+ if os.path.exists(means_sd_df):
+ logger.info("standardizing values using existing mean and SD")
+ means_sd_data = pd.read_parquet(means_sd_df)
+
+ means = means_sd_data["means"].to_numpy()
+ sds = means_sd_data["SDs"].to_numpy()
+ del means_sd_data
+ X_std = (X - means) / sds
+ del means
+ del sds
+
+ else:
+ logger.info("standardizing values")
+ X_std = (X - np.mean(X, axis=0)) / np.std(X, axis=0)
+ means_sd_data = pd.DataFrame(
+ {
+ "names": deepSEAcols,
+ "means": np.mean(X, axis=0),
+ "SDs": np.std(X, axis=0),
+ }
+ )
+ means_sd_data.to_parquet(means_sd_df)
del X
out_path = Path(out_dir)
@@ -504,7 +765,7 @@ def deepsea_pca(n_components: int, deepsea_file: str, pca_object: str, out_dir:
if os.path.exists(pca_object):
if ".pkl" in pca_object:
with open(pca_object, "rb") as pickle_file:
- logger.info("loading pca objects pickle file")
+ logger.info("loading pca objectas pickle file")
pca = pickle.load(pickle_file)
X_pca = pca.transform(X_std)
else:
@@ -559,6 +820,31 @@ def scorevariants_deepripe(
n_jobs: int,
saved_model_type: str = "parclip",
):
+ """
+ Score variants using deep learning models trained on PAR-CLIP and eCLIP data.
+
+ Parameters:
+ - variants_file (str): Path to the file containing variant information to be annotated.
+ - output_dir (str): Path to the output directory where the results will be saved.
+ - genomefasta (str): Path to the reference genome in FASTA format.
+ - pybedtools_tmp_dir (str): Path to the temporary directory for pybedtools.
+ - saved_deepripe_models_path (str): Path to the directory containing saved deepRiPe models.
+ - n_jobs (int): Number of parallel jobs for scoring variants.
+ - saved_model_type (str, optional): Type of the saved deepRiPe model to use (parclip, eclip_hg2, eclip_k5). Default is "parclip".
+
+ Returns:
+ None
+
+ Raises:
+ AssertionError: If there are NaN values in the generated DataFrame.
+
+ Notes:
+ - This function scores variants using deepRiPe models trained on different CLIP-seq datasets.
+ - The results are saved as a CSV file in the specified output directory.
+
+ Example:
+ $ python annotations.py scorevariants_deepripe variants.csv results/ reference.fasta tmp_dir/ saved_models/ 8 eclip_k5
+ """
file_name = variants_file.split("/")[-1]
bed_file = f"{output_dir}/{file_name[:-3]}bed"
@@ -582,7 +868,7 @@ def scorevariants_deepripe(
convert2bed(variants_file, output_dir)
variant_bed = pybedtools.BedTool(bed_file)
- logger.info(f"Scoring variants for: {bed_file}")
+ print(f"Scoring variants for: {bed_file}")
### paths for experiments
saved_models_dict = {
@@ -651,16 +937,16 @@ def scorevariants_deepripe(
)
for choice in current_model_type.keys():
- logger.debug(choice)
+ print(choice)
_, RBPnames = current_model_type[choice]
score_list = predictions[choice]
score_list = np.asarray(score_list)
- logger.info(f"Output size: {score_list.shape}")
+ print(f"Output size: {score_list.shape}")
### write predictions to df
for ix, RBP_name in enumerate(RBPnames):
df_variants[RBP_name] = score_list[:, ix]
- logger.info(
+ print(
f"saving file to: {output_dir}/{file_name[:-3]}{saved_model_type}_deepripe.csv.gz"
)
df_variants.to_csv(
@@ -675,8 +961,32 @@ def process_chunk(
tissue_agg_function,
ca_shortened,
):
+ """
+ Process a chunk of data from absplice site results and merge it with the remaining annotation data.
+
+ Parameters:
+ - chrom_file (str): The filename for the chunk of absplice site results.
+ - abs_splice_res_dir (Path): The directory containing the absplice site results.
+ - tissues_to_exclude (list): List of tissues to exclude from the absplice site results.
+ - tissue_agg_function (str): The aggregation function to use for tissue-specific AbSplice scores.
+ - ca_shortened (DataFrame): The remaining annotation data to merge with the absplice site results.
+
+ Returns:
+ DataFrame: Merged DataFrame containing aggregated tissue-specific AbSplice scores and remaining annotation data.
+
+ Notes:
+ - The function reads the absplice site results for a specific chromosome, excludes specified tissues, and aggregates AbSplice scores using the specified tissue aggregation function.
+ - The resulting DataFrame is merged with the remaining annotation data based on the chromosome, position, reference allele, alternative allele, and gene ID.
+
+ Example:
+ merged_data = process_chunk("chr1_results.csv", Path("abs_splice_results/"), ["Brain", "Heart"], "max", ca_shortened_df)
+ """
logger.info(f"Reading file {chrom_file}")
- ab_splice_res = pd.read_csv(abs_splice_res_dir / chrom_file).reset_index()
+
+ ab_splice_res = pd.read_csv(
+ abs_splice_res_dir / chrom_file, engine="pyarrow"
+ ).reset_index()
+
ab_splice_res = ab_splice_res.query("tissue not in @tissues_to_exclude")
logger.info(
f"AbSplice tissues excluded: {tissues_to_exclude}, Aggregating AbSplice scores using {tissue_agg_function}"
@@ -706,24 +1016,43 @@ def process_chunk(
f"Number of unique variants(variant) in merged {len(merged['variant'].unique())}"
)
- del ab_splice_res
-
return merged
+ del merged
+ del ab_splice_res
+
@cli.command()
@click.argument("current_annotation_file", type=click.Path(exists=True))
@click.argument("abs_splice_res_dir", type=click.Path(exists=True))
-@click.argument("out_file", type=click.Path())
@click.argument("absplice_score_file", type=click.Path())
@click.argument("njobs", type=int)
-def get_abscores(
+def aggregate_abscores(
current_annotation_file: str,
abs_splice_res_dir: str,
- out_file: str,
absplice_score_file: str,
njobs: int,
):
+ """
+ Aggregate AbSplice scores from AbSplice results and save the results.
+
+ Parameters:
+ - current_annotation_file (str): Path to the current annotation file in parquet format.
+ - abs_splice_res_dir (str): Path to the directory containing AbSplice results.
+ - absplice_score_file (str): Path to save the aggregated AbSplice scores in parquet format.
+ - njobs (int): Number of parallel jobs for processing AbSplice results.
+
+ Returns:
+ None
+
+ Notes:
+ - The function reads the current annotation file and extracts necessary information for merging.
+ - It then processes AbSplice results in parallel chunks, aggregating AbSplice scores.
+ - The aggregated scores are saved to the specified file.
+
+ Example:
+ $ python annotations.py aggregate_abscores annotations.parquet abs_splice_results/ absplice_scores.parquet 4
+ """
current_annotation_file = Path(current_annotation_file)
logger.info("reading current annotations file")
current_annotations = pd.read_parquet(current_annotation_file)
@@ -734,52 +1063,152 @@ def get_abscores(
current_annotations = current_annotations.rename(
columns={"AbSplice_DNA": "AbSplice_DNA_old"}
)
- ca_shortened = current_annotations[["id", "gene_id", "chrom", "pos", "ref", "alt"]]
+ ca_shortened = current_annotations[["id", "Gene", "chrom", "pos", "ref", "alt"]]
+ ca_shortened = ca_shortened.rename(columns={"Gene": "gene_id"})
logger.info(ca_shortened.columns)
abs_splice_res_dir = Path(abs_splice_res_dir)
tissue_agg_function = "max"
+ tissues_to_exclude = ["Testis"]
tissues_to_exclude = []
ab_splice_agg_score_file = absplice_score_file
- if not Path(ab_splice_agg_score_file).exists():
- logger.info("creating abSplice score file.. ")
-
- parallel = Parallel(n_jobs=njobs, return_as="generator")
- output_generator = parallel(
- delayed(process_chunk)(
- i,
- abs_splice_res_dir,
- tissues_to_exclude,
- tissue_agg_function,
- ca_shortened,
- )
- for i in tqdm(os.listdir(abs_splice_res_dir))
+ logger.info("creating abSplice score file.. ")
+ all_absplice_scores = []
+ parallel = Parallel(n_jobs=njobs, return_as="generator", verbose=50)
+ output_generator = parallel(
+ delayed(process_chunk)(
+ i, abs_splice_res_dir, tissues_to_exclude, tissue_agg_function, ca_shortened
+ )
+ for i in tqdm(sorted(os.listdir(abs_splice_res_dir)))
+ )
+ all_absplice_scores = list(output_generator)
+
+ logger.info("concatenating files")
+ all_absplice_scores = pd.concat(all_absplice_scores)
+ logger.info(f"saving score file to {ab_splice_agg_score_file}")
+ all_absplice_scores.to_parquet(ab_splice_agg_score_file)
+
+
+logging.basicConfig(
+ format="[%(asctime)s] %(levelname)s:%(name)s: %(message)s",
+ level="INFO",
+ stream=sys.stdout,
+)
+logger = logging.getLogger(__name__)
+
+
+def deepripe_score_variant_onlyseq_all(
+ model_group, variant_bed, genomefasta, seq_len=200, batch_size=1024, n_jobs=32
+):
+ """
+ Compute variant scores using a deep learning model for each specified variant.
+
+ Parameters:
+ - model_group (dict): A dictionary containing deep learning models for different choices. Each entry should be a key-value pair, where the key is the choice name and the value is a tuple containing the model and additional information.
+ - variant_bed (list): A list of variant bedlines, where each bedline represents a variant.
+ - genomefasta (str): Path to the reference genome in FASTA format.
+ - seq_len (int, optional): The length of the sequence to use around each variant. Default is 200.
+ - batch_size (int, optional): Batch size for parallelization. Default is 1024.
+ - n_jobs (int, optional): Number of parallel jobs for processing variant bedlines. Default is 32.
+
+ Returns:
+ dict: A dictionary containing variant scores for each choice in the model_group.
+ Each entry has the choice name as the key and the corresponding scores as the value.
+ """
+ predictions = {}
+
+ # Parallelize the encoding of variant bedlines
+ encoded_seqs_list = Parallel(n_jobs=n_jobs, verbose=10)(
+ delayed(deepripe_encode_variant_bedline)(
+ bedline, genomefasta, flank_size=(seq_len // 2) + 2
)
- all_absplice_scores = list(output_generator)
+ for bedline in variant_bed
+ )
- logger.info("concatenating files")
- all_absplice_scores = pd.concat(all_absplice_scores)
- logger.info(f"saving score file to {ab_splice_agg_score_file}")
- all_absplice_scores.to_parquet(ab_splice_agg_score_file)
+ # Handle cases where encoding is None
+ encoded_seqs_list = [
+ (x if x is not None else np.ones((2, seq_len + 4, 4)) * float("nan"))
+ for x in encoded_seqs_list
+ ]
+
+ # Concatenate encoded sequences
+ encoded_seqs = tf.concat(encoded_seqs_list, 0)
+
+ logger.info("Computing predictions")
+ # Compute predictions for each choice in the model group
+ for choice in tqdm(model_group.keys(), desc="Model group"):
+ avg_score = 0.0
+ for i in range(4):
+ cropped_seqs = encoded_seqs[:, i : i + seq_len, :]
+ model, _ = model_group[choice]
+ pred = model.predict_on_batch(cropped_seqs)
+ wild_indices = tf.range(pred.shape[0], delta=2)
+ mut_indices = tf.range(1, pred.shape[0], delta=2)
+ pred_wild = pred[wild_indices, :]
+ pred_mut = pred[mut_indices, :]
+ score = pred_mut - pred_wild
+ avg_score += score
+ predictions[choice] = avg_score / 4
+
+ return predictions
+
+
+def calculate_scores_max(scores):
+ if scores is None:
+ return None
else:
- logger.info("reading existing abSplice Score file")
- all_absplice_scores = pd.read_parquet(ab_splice_agg_score_file)
+ # Split the string and extract values from index 1 to 5
+ values = [float(score) for score in scores.split("|")[1:5] if score != "nan"]
+ # Calculate the sum
+ if len(values) > 0:
+ return np.max(values)
+ else:
+ return np.NaN
+
+
+@cli.command()
+@click.argument("current_annotation_file", type=click.Path(exists=True))
+@click.argument("absplice_score_file", type=click.Path())
+@click.argument("out_file", type=click.Path())
+def merge_abscores(
+ current_annotation_file: str,
+ absplice_score_file: str,
+ out_file: str,
+):
+ """
+ Merge AbSplice scores with the current annotation file and save the result.
+
+ Parameters:
+ - current_annotation_file (str): Path to the current annotation file in parquet format.
+ - absplice_score_file (str): Path to the AbSplice scores file in parquet format.
+ - out_file (str): Path to save the merged annotation file with AbSplice scores.
+
+ Returns:
+ None
+
+ Notes:
+ - The function reads AbSplice scores and the current annotation file.
+ - It merges the AbSplice scores with the current annotation file based on chromosome, position, reference allele, alternative allele, and gene ID.
+ - The merged file is saved with AbSplice scores.
+
+ Example:
+ $ python annotations.py merge_abscores current_annotation.parquet absplice_scores.parquet merged_annotations.parquet
+ """
+ all_absplice_scores = pd.read_parquet(absplice_score_file)
all_absplice_scores = all_absplice_scores[
- ["chrom", "pos", "ref", "alt", "gene_id", "AbSplice_DNA", "id"]
+ ["chrom", "pos", "ref", "alt", "gene_id", "AbSplice_DNA"]
]
annotations = pd.read_parquet(current_annotation_file, engine="pyarrow").drop(
columns=["AbSplice_DNA"], errors="ignore"
)
- annotations.drop_duplicates(
- inplace=True, subset=["chrom", "pos", "ref", "alt", "gene_id", "id"]
- )
-
+ annotations = annotations.rename(columns={"Gene": "gene_id"})
+ annotations.drop_duplicates(inplace=True, subset=["gene_id", "id"])
original_len = len(annotations)
logger.info("Merging")
@@ -788,12 +1217,17 @@ def get_abscores(
all_absplice_scores,
validate="1:1",
how="left",
- on=["chrom", "pos", "ref", "alt", "gene_id", "id"],
+ on=["chrom", "pos", "ref", "alt", "gene_id"],
)
logger.info("Sanity checking merge")
assert len(merged) == original_len
- assert len(merged[["gene_id", "id"]].drop_duplicates()) == len(merged)
+ logger.info(
+ f"len of merged after dropping duplicates: {len(merged.drop_duplicates(subset=['id', 'gene_id']))}"
+ )
+ logger.info(f"len of merged without dropping duplicates: {len(merged)}")
+
+ assert len(merged.drop_duplicates(subset=["id", "gene_id"])) == len(merged)
logger.info(
f'Filling {merged["AbSplice_DNA"].isna().sum()} '
@@ -810,53 +1244,6 @@ def get_abscores(
pd.options.mode.chained_assignment = None
-logging.basicConfig(
- format="[%(asctime)s] %(levelname)s:%(name)s: %(message)s",
- level="INFO",
- stream=sys.stdout,
-)
-logger = logging.getLogger(__name__)
-
-
-@cli.command()
-@click.option("--n-components", type=int, default=59)
-@click.argument("deepripe-file", type=click.Path(exists=True))
-@click.argument("out-dir", type=click.Path(exists=True))
-def deepripe_pca(n_components: int, deepripe_file: str, out_dir: str):
- logger.info("Reading deepripe file")
- df = pd.read_csv(deepripe_file)
- df = df.drop(["Uploaded_variant"], axis=1)
- logger.debug(df.columns)
- df = df.dropna()
- key_df = df[["chrom", "pos", "ref", "alt", "id"]].reset_index(drop=True)
-
- logger.info("Extracting matrix for PCA")
- X = df[[c for c in df.columns if c not in key_df.columns]].to_numpy()
- del df
- logger.info("transforming columns to z scores")
- X_std = (X - np.mean(X, axis=0)) / np.std(X, axis=0)
- del X
-
- logger.info("Running PCA")
- pca = PCA(n_components=n_components)
- pca.fit(X_std)
- out_path = Path(out_dir)
- with open(out_path / "pca.pkl", "wb") as f:
- pickle.dump(pca, f)
-
- logger.info(f"Projecting rows to {n_components} PCs")
- X_pca = pca.transform(X_std)
- del X_std
- pca_df = pd.DataFrame(
- X_pca, columns=[f"DeepRipe_PC_{i}" for i in range(1, n_components + 1)]
- )
- del X_pca
- pca_df = pd.concat([key_df, pca_df], axis=1)
- pca_df.to_parquet(out_path / "deepripe_pca.parquet", engine="pyarrow")
-
- logger.info("Done")
-
-
@cli.command()
@click.argument("annotation_file", type=click.Path(exists=True))
@click.argument("deepripe_file", type=click.Path(exists=True))
@@ -865,6 +1252,27 @@ def deepripe_pca(n_components: int, deepripe_file: str, out_dir: str):
def merge_deepripe(
annotation_file: str, deepripe_file: str, out_file: str, column_prefix: str
):
+ """
+ Merge deepRiPe scores with an annotation file and save the result.
+
+ Parameters:
+ - annotation_file (str): Path to the annotation file in parquet format.
+ - deepripe_file (str): Path to the deepRiPe scores file in CSV format.
+ - out_file (str): Path to save the merged file with deepRiPe scores.
+ - column_prefix (str): Prefix to add to the deepRiPe score columns in the merged file.
+
+ Returns:
+ None
+
+ Notes:
+ - The function reads the annotation file and deepRiPe scores file.
+ - It renames the columns in the deepRiPe scores file with the specified prefix.
+ - The two dataframes are merged based on chromosome, position, reference allele, alternative allele, and variant ID.
+ - The merged file is saved with deepRiPe scores.
+
+ Example:
+ $ python annotations.py merge_deepripe annotations.parquet deepripe_scores.csv merged_deepripe.parquet deepripe
+ """
annotations = pd.read_parquet(annotation_file)
deepripe_df = pd.read_csv(deepripe_file)
orig_len = len(annotations)
@@ -888,41 +1296,99 @@ def merge_deepripe(
@cli.command()
@click.argument("annotation_file", type=click.Path(exists=True))
@click.argument("deepripe_pca_file", type=click.Path(exists=True))
+@click.argument("column_yaml_file", type=click.Path(exists=True))
@click.argument("out_file", type=click.Path())
-def merge_deepsea_pcas(annotation_file: str, deepripe_pca_file: str, out_file: str):
- annotations = pd.read_parquet(annotation_file)
- deepripe_pcas = pd.read_parquet(deepripe_pca_file)
+def merge_deepsea_pcas(
+ annotation_file: str, deepripe_pca_file: str, column_yaml_file: str, out_file: str
+):
+ """
+ Merge deepRiPe PCA scores with an annotation file and save the result.
+
+ Parameters:
+ - annotation_file (str): Path to the annotation file in parquet format.
+ - deepripe_pca_file (str): Path to the deepRiPe PCA scores file in parquet format.
+ - column_yaml_file(str): Path to the yaml file containing all needed columns for the model, including their filling values.
+ - out_file (str): Path to save the merged file with deepRiPe PCA scores.
+
+ Returns:
+ None
+
+ Notes:
+ - The function reads the annotation file and deepRiPe PCA scores file.
+ - It drops duplicates in both files based on chromosome, position, reference allele, alternative allele, variant ID, and gene ID.
+ - The two dataframes are merged based on chromosome, position, reference allele, alternative allele, and variant ID.
+ - The merged file is saved with deepRiPe PCA scores.
+
+ Example:
+ $ python annotations.py merge_deepsea_pcas annotations.parquet deepripe_pca_scores.parquet merged_deepsea_pcas.parquet
+ """
+
+ pcols = get_parquet_columns(deepripe_pca_file)
+ anno_cols = get_parquet_columns(annotation_file)
+ logger.info("reading current annotations")
+ prior_names, *_ = readYamlColumns(column_yaml_file)
+
+ DScommonCols = list(set(prior_names).intersection(set(pcols)))
+ AnnoCommonCols = list(set(prior_names).intersection(set(anno_cols)))
+ annotations = pd.read_parquet(
+ annotation_file,
+ columns=AnnoCommonCols + ["chrom", "pos", "ref", "alt", "id", "Gene"],
+ )
+ logger.info("reading PCAs")
+ deepripe_pcas = pd.read_parquet(
+ deepripe_pca_file, columns=DScommonCols + ["chrom", "pos", "ref", "alt", "id"]
+ )
+ deepripe_pcas = deepripe_pcas.drop_duplicates(
+ subset=["chrom", "pos", "ref", "alt", "id"]
+ )
orig_len = len(annotations)
+ logger.info(f"length of annotation file before merge: {orig_len}")
+ annotations = annotations.drop_duplicates(
+ subset=["chrom", "pos", "ref", "alt", "id", "Gene"]
+ )
+ noduplicates_len = len(annotations)
+ logger.info(
+ f"length of annotation file after dropping duplicates: {noduplicates_len}"
+ )
+ logger.info("merging")
merged = annotations.merge(
deepripe_pcas, how="left", on=["chrom", "pos", "ref", "alt", "id"]
)
- merged.rename(columns={"Gene": "gene_id"}, inplace=True)
-
- assert len(merged) == orig_len
+ logger.info(f"length of annotation file after merge: {len(merged)}")
+ logger.info("checking lengths")
+ assert len(merged) == noduplicates_len
+ logger.info(f"writing file to {out_file}")
merged.to_parquet(out_file)
-@cli.command()
-@click.argument("in_variants", type=click.Path(exists=True))
-@click.argument("out_variants", type=click.Path())
-def process_annotations(in_variants: str, out_variants: str):
- variant_path = Path(in_variants)
- variants = pd.read_parquet(variant_path)
-
- logger.info("filtering for canonical variants")
+def process_chunk_addids(chunk: pd.DataFrame, variants: pd.DataFrame) -> pd.DataFrame:
+ """
+ Process a chunk of data by adding identifiers from a variants dataframe.
- variants = variants.loc[variants.CANONICAL == "YES"]
- variants.rename(columns={"Gene": "gene_id"}, inplace=True)
+ Parameters:
+ - chunk (pd.DataFrame): Chunk of data containing variant information.
+ - variants (pd.DataFrame): Dataframe containing variant identifiers.
- logger.info("aggregating consequences for different alleles")
+ Returns:
+ pd.DataFrame: Processed chunk with added variant identifiers.
- # combining variant id with gene id
- variants["censequence_id"] = variants["id"].astype(str) + variants["gene_id"]
- variants.to_parquet(out_variants)
+ Raises:
+ AssertionError: If the shape of the processed chunk does not match expectations.
+ Notes:
+ - The function renames columns for compatibility.
+ - Drops duplicates in the chunk based on the key columns.
+ - Merges the chunk with the variants dataframe based on the key columns.
+ - Performs assertions to ensure the shape of the processed chunk meets expectations.
-def process_chunk_addids(chunk, variants):
+ Example:
+ ```python
+ chunk = pd.read_csv("chunk_data.csv")
+ variants = pd.read_csv("variants_data.csv")
+ processed_chunk = process_chunk_addids(chunk, variants)
+ ```
+ """
chunk = chunk.rename(
columns={
"#CHROM": "chrom",
@@ -934,8 +1400,10 @@ def process_chunk_addids(chunk, variants):
}
)
key_cols = ["chrom", "pos", "ref", "alt"]
+
chunk.drop_duplicates(subset=key_cols, inplace=True)
chunk_shape = chunk.shape
+
chunk = pd.merge(chunk, variants, on=key_cols, how="left", validate="1:1")
try:
@@ -961,56 +1429,147 @@ def process_chunk_addids(chunk, variants):
@click.argument("njobs", type=int)
@click.argument("out_file", type=click.Path())
def add_ids(annotation_file: str, variant_file: str, njobs: int, out_file: str):
- data = pd.read_csv(annotation_file, chunksize=10_000)
+ """
+ Add identifiers from a variant file to an annotation file and save the result.
+
+ Parameters:
+ - annotation_file (str): Path to the input annotation file in CSV format.
+ - variant_file (str): Path to the input variant file in TSV format.
+ - njobs (int): Number of parallel jobs to process the data.
+ - out_file (str): Path to save the processed data in Parquet format.
+
+ Returns:
+ None
+
+ Notes:
+ - The function reads the annotation file in chunks and the entire variant file.
+ - It uses parallel processing to apply the 'process_chunk_addids' function to each chunk.
+ - The result is saved in Parquet format.
+
+ Example:
+ $ python annotations.py add_ids annotation_data.csv variant_data.tsv 4 processed_data.parquet
+ """
+
+ data = pd.read_csv(annotation_file, chunksize=100_000)
all_variants = pd.read_csv(variant_file, sep="\t")
- parallel = Parallel(n_jobs=njobs, return_as="generator")
+ parallel = Parallel(n_jobs=njobs, return_as="generator", verbose=50)
output_generator = parallel(
delayed(process_chunk_addids)(chunk, all_variants) for chunk in data
)
- pd.concat([batch for batch in output_generator]).to_csv(out_file, index=False)
+ first = True
+ for batch in tqdm(output_generator):
+ if first:
+ batch.to_parquet(out_file, engine="fastparquet")
+ else:
+ batch.to_parquet(out_file, engine="fastparquet", append=True)
+ first = False
@cli.command()
-@click.option("--included-chromosomes", type=str)
-@click.option("--comment-lines", is_flag=True)
-@click.option("--sep", type=str, default=",")
-@click.argument("annotation_dir", type=click.Path(exists=True))
-@click.argument("deepripe_name_pattern", type=str)
-@click.argument("pvcf-blocks_file", type=click.Path(exists=True))
+@click.argument("annotation_file", type=click.Path(exists=True))
+@click.argument("variant_file", type=click.Path(exists=True))
@click.argument("out_file", type=click.Path())
-def concatenate_deepripe(
- included_chromosomes: Optional[str],
- sep: str,
- comment_lines: bool,
- annotation_dir: str,
- deepripe_name_pattern: str,
- pvcf_blocks_file: str,
+def add_ids_dask(annotation_file: str, variant_file: str, out_file: str):
+ """
+ Add identifiers from a variant file to an annotation file using Dask and save the result.
+
+ Parameters:
+ - annotation_file (str): Path to the input annotation file in Parquet format.
+ - variant_file (str): Path to the input variant file in Parquet format.
+ - out_file (str): Path to save the processed data in Parquet format.
+
+ Returns:
+ None
+
+ Notes:
+ - The function uses Dask to read annotation and variant files with large block sizes.
+ - It renames columns for compatibility and drops duplicates based on key columns.
+ - Merges the Dask dataframes using the 'merge' function.
+ - The result is saved in Parquet format with compression.
+
+ Example:
+ $ python annotations.py add_ids_dask annotation_data.parquet variant_data.parquet 4 processed_data.parquet
+ """
+ data = dd.read_parquet(annotation_file, blocksize=25e9)
+ all_variants = pd.read_parquet(variant_file)
+ data = data.rename(
+ columns={
+ "#CHROM": "chrom",
+ "POS": "pos",
+ "ID": "variant_name",
+ "REF": "ref",
+ "ALT": "alt",
+ "chr": "chrom",
+ }
+ )
+ key_cols = ["chrom", "pos", "ref", "alt"]
+ data.drop_duplicates(subset=key_cols, inplace=True)
+ data = dd.merge(data, all_variants, on=key_cols, how="left")
+ data.to_parquet(out_file, engine="fastparquet", compression="zstd")
+
+
+def chunks(lst, n):
+ """
+ Split a list into chunks of size 'n'.
+
+ Parameters:
+ - lst (list): The input list to be split into chunks.
+ - n (int): The size of each chunk.
+
+ Yields:
+ list: A chunk of the input list.
+ """
+ for i in range(0, len(lst), n):
+ yield lst[i : i + n]
+
+
+def read_deepripe_file(f: str):
+ """
+ Read a DeepRipe file from the specified path.
+
+ Parameters:
+ - f (str): Path to the DeepRipe file.
+
+ Returns:
+ pd.DataFrame: DataFrame containing the data from the DeepRipe file.
+
+ Example:
+ ```python
+ file_path = "path/to/deepripe_file.txt"
+ deepripe_data = read_deepripe_file(file_path)
+ ```
+ """
+ f = pd.read_table(f, engine="c")
+ return f
+
+
+@cli.command()
+@click.argument("deepsea_files", type=str)
+@click.argument("out_file", type=click.Path())
+@click.argument("njobs", type=int)
+def concatenate_deepsea(
+ deepsea_files: str,
out_file: str,
+ njobs: int,
):
- annotation_dir = Path(annotation_dir)
+ """
+ Concatenate DeepSEA files based on the provided patterns and chromosome blocks.
- logger.info("Reading variant file")
+ Parameters:
+ - deepSEA_name_pattern (str): comma-separated list of deepsea files to concatenate
+ - out_file (str): Path to save the concatenated output file in Parquet format.
+ - njobs (int): Number of parallel jobs for processing.
- logger.info("reading pvcf block file")
- pvcf_blocks_df = pd.read_csv(
- pvcf_blocks_file,
- sep="\t",
- header=None,
- names=["Index", "Chromosome", "Block", "First position", "Last position"],
- dtype={"Chromosome": str},
- ).set_index("Index")
- if included_chromosomes is not None:
- included_chromosomes = [int(c) for c in included_chromosomes.split(",")]
- pvcf_blocks_df = pvcf_blocks_df[
- pvcf_blocks_df["Chromosome"].isin([str(c) for c in included_chromosomes])
- ]
- pvcf_blocks = zip(pvcf_blocks_df["Chromosome"], pvcf_blocks_df["Block"])
- file_paths = [
- annotation_dir / deepripe_name_pattern.format(chr=p[0], block=p[1])
- for p in pvcf_blocks
- ]
+ Returns:
+ None
+
+ Example:
+ $ python annotations.py concatenate_deepSEA chr1_block0.CLI.deepseapredict.diff.tsv,chr1_block1.CLI.deepseapredict.diff.tsv,chr1_block2.CLI.deepseapredict.diff.tsv concatenated_output.parquet 4
+ """
+
+ file_paths = deepsea_files.split(",")
logger.info("check if out_file already exists")
if os.path.exists(out_file):
logger.info("file exists, removing existing file")
@@ -1019,16 +1578,28 @@ def concatenate_deepripe(
logger.info("out_file does not yet exist")
logger.info("reading in f")
- for f in tqdm(file_paths):
- if comment_lines:
- current_file = pd.read_csv(f, comment="#", sep=sep, low_memory=False)
- else:
- current_file = pd.read_csv(f, sep=sep, low_memory=False)
- if f == file_paths[0]:
+
+ parallel = Parallel(n_jobs=njobs, backend="loky", return_as="generator")
+ chunked_files = list(chunks(file_paths, njobs))
+ logger.info(f"processing {len(chunked_files)} files")
+ for chunk in tqdm(chunked_files):
+ logger.info(f"Chunk consist of {len(chunk)} files")
+ this_generator = parallel((delayed(read_deepripe_file)(f) for f in chunk))
+ current_file = pd.concat(list(this_generator))
+ if chunk == chunked_files[0]:
logger.info("creating new file")
- current_file.to_csv(out_file, mode="a", index=False)
+ current_file.to_parquet(out_file, engine="fastparquet")
else:
- current_file.to_csv(out_file, mode="a", index=False, header=False)
+ try:
+ current_file.to_parquet(out_file, engine="fastparquet", append=True)
+ except ValueError:
+ out_df_columns = pd.read_parquet(out_file, engine="fastparquet").columns
+
+ logger.error(
+ f"columns are not equal in saved/appending file: {[i for i in out_df_columns if i not in current_file.columns]} and {[i for i in current_file.columns if i not in out_df_columns]} "
+ )
+
+ raise ValueError
@cli.command()
@@ -1038,7 +1609,9 @@ def concatenate_deepripe(
@click.argument("deepripe_hg2_file", type=click.Path(exists=True))
@click.argument("deepripe_k5_file", type=click.Path(exists=True))
@click.argument("variant_file", type=click.Path(exists=True))
+@click.argument("vcf_file", type=click.Path(exists=True))
@click.argument("out_file", type=click.Path())
+@click.option("--vepcols_to_retain", type=str)
def merge_annotations(
vep_header_line: int,
vep_file: str,
@@ -1046,42 +1619,61 @@ def merge_annotations(
deepripe_hg2_file: str,
deepripe_k5_file: str,
variant_file: str,
+ vcf_file: str,
out_file: str,
+ vepcols_to_retain: Optional[str],
):
+ """
+ Merge VEP, DeepRipe (parclip, hg2, k5), and variant files into one dataFrame and save result as parquet file
+
+ Parameters:
+ - vep_header_line (int): Line number of the header line in the VEP output file.
+ - vep_file (str): Path to the VEP file.
+ - deepripe_parclip_file (str): Path to the DeepRipe parclip file.
+ - deepripe_hg2_file (str): Path to the DeepRipe hg2 file.
+ - deepripe_k5_file (str): Path to the DeepRipe k5 file.
+ - variant_file (str): Path to the variant file.
+ - vcf_file (str): vcf file containing chrom, pos, ref and alt information
+ - out_file (str): Path to save the merged output file in Parquet format.
+ - vepcols_to_retain (Optional[str]): Comma-separated list of additional VEP columns to retain.
+
+ Returns:
+ None
+
+ Example:
+ $ python annotations.py merge_annotations 1 vep_file.tsv deepripe_parclip.csv deepripe_hg2.csv deepripe_k5.csv variant_file.tsv merged_output.parquet --vepcols_to_retain="AlphaMissense,PolyPhen"
+ """
# load vep file
- vep_df = pd.read_csv(
- vep_file,
- header=vep_header_line,
- sep="\t",
- na_values="-",
+ vep_df = pd.read_csv(vep_file, header=vep_header_line, sep="\t", na_values="-")
+ if vepcols_to_retain is not None:
+ vepcols_to_retain = [c for c in vepcols_to_retain.split(",")]
+ vep_df = process_vep(
+ vep_file=vep_df, vcf_file=vcf_file, vepcols_to_retain=vepcols_to_retain
)
- vep_df = process_vep(vep_file=vep_df)
logger.info(f"vep_df shape is {vep_df.shape}")
- # load deepripe_parclip
+ logger.info("load deepripe_parclip")
+
deepripe_parclip_df = pd.read_csv(deepripe_parclip_file)
deepripe_parclip_df = process_deepripe(deepripe_parclip_df, "parclip")
- # load deepripe_k5
+ logger.info("load deepripe_k5")
+
deepripe_k5_df = pd.read_csv(deepripe_k5_file)
deepripe_k5_df = process_deepripe(deepripe_k5_df, "k5")
- # load deepripe_hg2
+ logger.info("load deepripe_hg2")
+
deepripe_hg2_df = pd.read_csv(deepripe_hg2_file)
deepripe_hg2_df = process_deepripe(deepripe_hg2_df, "hg2")
- # load variant_file
- logger.info(f"reading in {variant_file}")
- variants = pd.read_csv(variant_file, sep="\t")
+ logger.info("load variant_file")
- # If variants start with chr
- # TODO Check if this is always true
- variants["chrom"] = variants["chrom"].str.replace("chr", "")
+ logger.info(f"reading in {variant_file}")
+ variants = pd.read_parquet(variant_file)
- # merge vep to variants M:1
+ logger.info("merge vep to variants M:1")
ca = vep_df.merge(
variants, how="left", on=["chrom", "pos", "ref", "alt"], validate="m:1"
)
del vep_df
- # merge deepripe files to variants 1:1
- logger.info(ca.columns)
- logger.info(deepripe_parclip_df.columns)
+ logger.info("merge deepripe files to variants 1:1")
ca = ca.merge(
deepripe_parclip_df,
how="left",
@@ -1095,10 +1687,23 @@ def merge_annotations(
deepripe_hg2_df, how="left", on=["chrom", "pos", "ref", "alt"], validate="m:1"
)
- ca.to_parquet(out_file)
+ ca.to_parquet(out_file, compression="zstd")
+
+def process_deepripe(deepripe_df: pd.DataFrame, column_prefix: str) -> pd.DataFrame:
+ """
+ Process the DeepRipe DataFrame, rename columns and drop duplicates.
-def process_deepripe(deepripe_df: object, column_prefix: str) -> object:
+ Parameters:
+ - deepripe_df (pd.DataFrame): DataFrame containing DeepRipe data.
+ - column_prefix (str): Prefix to be added to column names.
+
+ Returns:
+ pd.DataFrame: Processed DeepRipe DataFrame.
+
+ Example:
+ deepripe_df = process_deepripe(deepripe_df, "parclip")
+ """
logger.info("renaming deepripe columns")
deepripe_df = deepripe_df.rename(columns={"chr": "chrom"})
@@ -1113,46 +1718,65 @@ def process_deepripe(deepripe_df: object, column_prefix: str) -> object:
return deepripe_df
-def process_vep(vep_file: object) -> object:
- vep_file[["chrom", "pos", "ref", "alt"]] = (
- vep_file["#Uploaded_variation"]
- .str.replace("_", ":")
- .str.replace("/", ":")
- .str.split(":", expand=True)
- )
-
- vep_file["pos"] = vep_file["pos"].astype(int)
- logger.debug(vep_file.columns)
-
- vep_str_cols = [
- "CDS_position",
- "Protein_position",
- "Amino_acids",
- "Codons",
- "SYMBOL",
- "SYMBOL_SOURCE",
- "HGNC_ID",
- "MANE_SELECT",
- "APPRIS",
- "CCDS",
- "ENSP",
- "SWISSPROT",
- "TREMBL",
- "UNIPARC",
- "UNIPROT_ISOFORM",
- "RefSeq",
- "SIFT",
- "PolyPhen",
- "INTRON",
- "DOMAINS",
- "HGVSp",
- "SpliceAI_pred",
+def process_vep(
+ vep_file: pd.DataFrame, vcf_file: str, vepcols_to_retain: list = []
+) -> pd.DataFrame:
+ """
+ Process the VEP DataFrame, extracting relevant columns and handling data types.
+
+ Parameters:
+ - vep_file (pd.DataFrame): DataFrame containing VEP data.
+ - vepcols_to_retain (list, optional): List of additional columns to retain. Defaults to an empty list.
+
+ Returns:
+ pd.DataFrame: Processed VEP DataFrame.
+
+ Example:
+ vep_file = process_vep(vep_file, vepcols_to_retain=["additional_col1", "additional_col2"])
+ """
+ vcf_df = pd.read_table(
+ vcf_file, names=["chrom", "pos", "#Uploaded_variation", "ref", "alt"]
+ )
+ if "#Uploaded_variation" in vep_file.columns:
+ vep_file = vep_file.merge(vcf_df, on="#Uploaded_variation", how="left")
+ if vep_file.chrom.isna().sum() > 0:
+ vep_file.loc[vep_file.chrom.isna(), ["chrom", "pos", "ref", "alt"]] = (
+ vep_file[vep_file["chrom"].isna()]["#Uploaded_variation"]
+ .str.replace("_", ":")
+ .str.replace("/", ":")
+ .str.split(":", expand=True)
+ .values
+ )
+ assert vep_file.chrom.isna().sum() == 0
+ assert vep_file.pos.isna().sum() == 0
+ assert vep_file.ref.isna().sum() == 0
+ assert vep_file.alt.isna().sum() == 0
+ assert (
+ vep_file[["chrom", "pos", "ref", "alt"]].drop_duplicates().shape
+ == vcf_df[["chrom", "pos", "ref", "alt"]].drop_duplicates().shape
+ )
+
+ if "pos" in vep_file.columns:
+ vep_file["pos"] = vep_file["pos"].astype(int)
+
+ vep_file["chrom"] = vep_file["chrom"].apply(
+ lambda x: "{}{}".format("chr", x.split("chr")[-1])
+ )
+
+ str_cols = [
"STRAND",
"TSL",
"GENE_PHENO",
+ "CADD_PHRED",
+ "CADD_RAW",
+ "SpliceAI_pred",
+ "BIOTYPE",
+ "Gene",
]
+ str_cols_present = [i for i in str_cols if i in vep_file.columns]
+ vep_file[str_cols_present] = vep_file[str_cols_present].astype(str)
- vep_float_cols = [
+ float_vals = [
"DISTANCE",
"gnomADg_FIN_AF",
"AF",
@@ -1170,89 +1794,131 @@ def process_vep(vep_file: object) -> object:
"TSL",
"Condel",
]
+ float_vals_present = [i for i in float_vals if i in vep_file.columns]
+ vep_file[float_vals_present] = (
+ vep_file[float_vals_present].replace("-", "NaN").astype(float)
+ )
+
+ necessary_columns = (
+ [
+ "chrom",
+ "pos",
+ "ref",
+ "alt",
+ "Gene",
+ "gnomADe_NFE_AF",
+ "CADD_PHRED",
+ "CADD_RAW",
+ "Consequence",
+ "PrimateAI",
+ "Alpha_Missense",
+ "am_pathogenicity",
+ "AbSplice_DNA",
+ "PolyPhen",
+ "SIFT",
+ "SIFT_score",
+ "PolyPhen_score",
+ "UKB_AF",
+ "combined_UKB_NFE_AF",
+ "combined_UKB_NFE_AF_MB",
+ "gene_id",
+ "Condel",
+ ]
+ + str_cols
+ + float_vals
+ + (vepcols_to_retain or [])
+ )
+ necessary_columns_present = [i for i in necessary_columns if i in vep_file.columns]
+
+ vep_file = vep_file[list(set(necessary_columns_present))]
+
+ if "SpliceAI_pred" in vep_file.columns:
+ vep_file["SpliceAI_delta_score"] = vep_file["SpliceAI_pred"].apply(
+ calculate_scores_max
+ )
+
+ if "Consequence" in vep_file.columns:
+ dummies = (
+ vep_file["Consequence"].str.get_dummies(",").add_prefix("Consequence_")
+ )
+ else:
+ raise ValueError("'Consequence' column expected to be in VEP output")
all_consequences = [
- "Consequence_3_prime_UTR_variant" "Consequence_5_prime_UTR_variant",
- "Consequence_NMD_transcript_variant",
+ "Consequence_splice_acceptor_variant",
+ "Consequence_5_prime_UTR_variant",
"Consequence_TFBS_ablation",
- "Consequence_TF_binding_site_variant",
- "Consequence_coding_sequence_variant",
- "Consequence_downstream_gene_variant",
- "Consequence_frameshift_variant",
+ "Consequence_start_lost",
"Consequence_incomplete_terminal_codon_variant",
- "Consequence_inframe_deletion",
- "Consequence_inframe_insertion",
- "Consequence_intergenic_variant",
"Consequence_intron_variant",
- "Consequence_mature_miRNA_variant",
- "Consequence_missense_variant",
- "Consequence_non_coding_transcript_exon_variant",
- "Consequence_non_coding_transcript_variant",
- "Consequence_protein_altering_variant",
- "Consequence_regulatory_region_variant",
- "Consequence_splice_acceptor_variant",
+ "Consequence_stop_gained",
"Consequence_splice_donor_5th_base_variant",
- "Consequence_splice_donor_region_variant",
+ "Consequence_downstream_gene_variant",
+ "Consequence_intergenic_variant",
"Consequence_splice_donor_variant",
+ "Consequence_NMD_transcript_variant",
+ "Consequence_protein_altering_variant",
"Consequence_splice_polypyrimidine_tract_variant",
- "Consequence_splice_region_variant",
- "Consequence_start_lost",
- "Consequence_start_retained_variant",
- "Consequence_stop_gained",
+ "Consequence_inframe_insertion",
+ "Consequence_mature_miRNA_variant",
+ "Consequence_synonymous_variant",
+ "Consequence_regulatory_region_variant",
+ "Consequence_non_coding_transcript_exon_variant",
"Consequence_stop_lost",
+ "Consequence_TF_binding_site_variant",
+ "Consequence_splice_donor_region_variant",
"Consequence_stop_retained_variant",
- "Consequence_synonymous_variant",
+ "Consequence_splice_region_variant",
+ "Consequence_coding_sequence_variant",
"Consequence_upstream_gene_variant",
+ "Consequence_frameshift_variant",
+ "Consequence_start_retained_variant",
+ "Consequence_3_prime_UTR_variant",
+ "Consequence_inframe_deletion",
+ "Consequence_missense_variant",
+ "Consequence_non_coding_transcript_variant",
+ "Consequence_TFBS_amplification",
+ "Consequence_coding_transcript_variant",
+ "Consequence_feature_elongation",
+ "Consequence_feature_truncation",
+ "Consequence_regulatory_region_ablation",
+ "Consequence_regulatory_region_amplification",
+ "Consequence_sequence_variant",
+ "Consequence_transcript_ablation",
+ "Consequence_transcript_amplification",
]
-
- dummies = vep_file["Consequence"].str.get_dummies(",").add_prefix("Consequence_")
+ all_consequences = list(set(all_consequences))
mask = pd.DataFrame(
data=np.zeros(shape=(len(vep_file), len(all_consequences))),
columns=all_consequences,
- dtype="Int8",
+ dtype=float,
)
-
mask[list(dummies.columns)] = dummies
vep_file[mask.columns] = mask
- vep_file[vep_str_cols] = vep_file[vep_str_cols].astype(str)
- vep_file[vep_float_cols] = vep_file[vep_float_cols].astype(float)
- vep_file[all_consequences] = vep_file[all_consequences].astype("Int8")
-
return vep_file
@cli.command()
-@click.argument("pvcf_blocks_file", type=click.Path(exists=True))
-@click.argument("annotation_dir", type=click.Path(exists=True))
-@click.argument("filename_pattern", type=str)
+@click.argument("filenames", type=str)
@click.argument("out_file", type=click.Path())
-@click.option("--included-chromosomes", type=str)
def concat_annotations(
- pvcf_blocks_file: str,
- annotation_dir: str,
- filename_pattern: str,
+ filenames: str,
out_file: str,
- included_chromosomes: Optional[str],
):
- logger.info("reading pvcf block file")
- pvcf_blocks_df = pd.read_csv(
- pvcf_blocks_file,
- sep="\t",
- header=None,
- names=["Index", "Chromosome", "Block", "First position", "Last position"],
- dtype={"Chromosome": str},
- ).set_index("Index")
- if included_chromosomes is not None:
- included_chromosomes = [int(c) for c in included_chromosomes.split(",")]
- pvcf_blocks_df = pvcf_blocks_df[
- pvcf_blocks_df["Chromosome"].isin([str(c) for c in included_chromosomes])
- ]
- pvcf_blocks = zip(pvcf_blocks_df["Chromosome"], pvcf_blocks_df["Block"])
- annotation_dir = Path(annotation_dir)
- file_paths = [
- annotation_dir / filename_pattern.format(chr=p[0], block=p[1])
- for p in pvcf_blocks
- ]
+ """
+ Concatenate multiple annotation files based on the specified pattern and create a single output file.
+
+ Parameters:
+ - filenames (str): File paths for annotation files to concatenate
+ - out_file (str): Output file path.
+
+ Returns:
+ None
+
+ Example:
+ concat_annotations "annotations/chr1_block0_merged.parquet,annotations/chr1_block1_merged.parquet,annotations/chr1_block2_merged.parquet " "output.parquet")
+ """
+ file_paths = filenames.split(",")
for f in tqdm(file_paths):
logger.info(f"processing file {f}")
file = pd.read_parquet(f)
@@ -1275,5 +1941,165 @@ def concat_annotations(
raise ValueError
+@cli.command()
+@click.argument("genotype_file", type=click.Path(exists=True))
+@click.argument("variants_filepath", type=click.Path(exists=True))
+@click.argument("out_file", type=click.Path())
+def get_af_from_gt(genotype_file: str, variants_filepath: str, out_file: str):
+ """
+ Compute allele frequencies from genotype data.
+
+ Parameters:
+ - genotype_file (str): Path to the genotype file.
+ - variants_filepath (str): Path to the variants file.
+ - out_file (str): Output file path for storing allele frequencies.
+ """
+ import h5py
+
+ variants = pd.read_parquet(variants_filepath)
+ max_variant_id = variants["id"].max()
+
+ logger.info("Computing allele frequencies")
+ variant_counts = np.zeros(max_variant_id + 1)
+ with h5py.File(genotype_file, "r") as f:
+ variant_matrix = f["variant_matrix"]
+ genotype_matrix = f["genotype_matrix"]
+ n_samples = variant_matrix.shape[0]
+ for i in trange(n_samples):
+ variants = variant_matrix[i]
+ mask = variants > 0
+ variants = variants[mask]
+ genotype = genotype_matrix[i]
+ genotype = genotype[mask]
+ variant_counts[variants] += genotype
+
+ af = variant_counts / (2 * n_samples)
+ af_df = pd.DataFrame({"id": np.arange(max_variant_id + 1), "af": af})
+ af_df.to_parquet(out_file)
+
+
+@cli.command()
+@click.argument("annotations_path", type=click.Path(exists=True))
+@click.argument("af_df_path", type=click.Path(exists=True))
+@click.argument("out_file", type=click.Path())
+def merge_af(annotations_path: str, af_df_path: str, out_file: str):
+ """
+ Merge allele frequency data into annotations and save to a file.
+
+ Parameters:
+ - annotations_path (str): Path to the annotations file.
+ - af_df_path (str): Path to the allele frequency DataFrame file.
+ - out_file (str): Path to the output file to save merged data.
+ """
+ annotations_df = pd.read_parquet(annotations_path)
+ af_df = pd.read_parquet(af_df_path)
+ merged_df = annotations_df.merge(af_df, how="left", on="id")
+ merged_df.to_parquet(out_file)
+
+
+@cli.command()
+@click.argument("annotations_path", type=click.Path(exists=True))
+@click.argument("out_file", type=click.Path())
+def calculate_maf(annotations_path: str, out_file: str):
+ """
+ Calculate minor allele frequency (MAF) from allele frequency data in annotations.
+
+ Parameters:
+ - annotations_path (str): Path to the annotations file containing allele frequency data.
+ - out_file (str): Path to the output file to save the calculated MAF data.
+ """
+ annotation_file = pd.read_parquet(annotations_path)
+ af = annotation_file["af"]
+ annotation_file = annotation_file.drop(
+ columns=["UKB_AF_MB", "UKB_MAF"], errors="ignore"
+ )
+ annotation_file["maf"] = af.apply(lambda x: min(x, 1 - x))
+ annotation_file["maf_mb"] = (af * (1 - af) + 1e-8) ** (-0.5)
+ annotation_file.to_parquet(out_file)
+
+
+@cli.command()
+@click.argument("gene_id_file", type=click.Path(exists=True))
+@click.argument("annotations_path", type=click.Path(exists=True))
+@click.argument("out_file", type=click.Path())
+def add_gene_ids(gene_id_file: str, annotations_path: str, out_file: str):
+ """
+ Add gene IDs to the annotations based on gene ID mapping file.
+
+ Parameters:
+ - gene_id_file (str): Path to the gene ID mapping file.
+ - annotations_path (str): Path to the annotations file.
+ - out_file (str): Path to the output file to save the annotations with protein IDs.
+ """
+ genes = pd.read_parquet(gene_id_file)
+ genes[["gene_base", "feature"]] = genes["gene"].str.split(".", expand=True)
+ genes.drop(columns=["feature", "gene", "gene_name", "gene_type"], inplace=True)
+ genes.rename(columns={"id": "gene_id"}, inplace=True)
+ annotations = pd.read_parquet(annotations_path)
+ len_anno = len(annotations)
+ annotations.rename(columns={"gene_id": "gene_base"}, inplace=True)
+ merged = annotations.merge(genes, on=["gene_base"], how="left")
+ assert len(merged) == len_anno
+ merged.to_parquet(out_file)
+
+
+@cli.command()
+@click.argument("gtf_filepath", type=click.Path(exists=True))
+@click.argument("out_file", type=click.Path())
+def create_gene_id_file(gtf_filepath: str, out_file: str):
+ """
+ Create a protein ID mapping file from the GTF file.
+
+ Parameters:
+ - gtf_filepath (str): Path to the GTF file.
+ - out_file (str): Path to save the output protein ID mapping file.
+ """
+ import pyranges as pr
+
+ gtf = pr.read_gtf(gtf_filepath)
+ gtf = gtf.as_df()
+ gtf = gtf.query(
+ "Feature =='gene' and gene_type=='protein_coding' and Source=='HAVANA'"
+ )
+ gtf = (
+ gtf[["gene_id", "gene_type", "gene_name"]]
+ .reset_index(drop=True)
+ .reset_index()
+ .rename(columns={"gene_id": "gene", "index": "id"})
+ )
+ gtf.to_parquet(out_file)
+
+
+@cli.command()
+@click.argument("annotation_columns_yaml_file", type=click.Path(exists=True))
+@click.argument("annotations_path", type=click.Path(exists=True))
+@click.argument("out_file", type=click.Path())
+def select_rename_fill_annotations(
+ annotation_columns_yaml_file: str, annotations_path: str, out_file: str
+):
+ """
+ Select, rename, and fill missing values in annotation columns based on a YAML configuration file.
+
+ Parameters:
+ - annotation_columns_yaml_file (str): Path to the YAML file containing name and fill value mappings.
+ - annotations_path (str): Path to the annotations file.
+ - out_file (str): Path to save the modified annotations file.
+ """
+
+ logger.info(
+ f"reading in yaml file containing name and fill value mappings from {annotation_columns_yaml_file}"
+ )
+ prior_names, _, _, column_name_mapping, fill_value_mapping = readYamlColumns(
+ annotation_columns_yaml_file
+ )
+ key_cols = ["id", "gene_id"]
+ anno_df = pd.read_parquet(
+ annotations_path, columns=list(set(prior_names + key_cols))
+ )
+ anno_df.rename(columns=column_name_mapping, inplace=True)
+ anno_df.fillna(fill_value_mapping, inplace=True)
+ anno_df.to_parquet(out_file)
+
+
if __name__ == "__main__":
cli()
diff --git a/deeprvat/data/dense_gt.py b/deeprvat/data/dense_gt.py
index ad49e51b..42ce7831 100644
--- a/deeprvat/data/dense_gt.py
+++ b/deeprvat/data/dense_gt.py
@@ -107,7 +107,6 @@ def __init__(
cache_matrices: bool = False,
verbose: bool = False,
):
- sample_file = None # TODO delete this!
if verbose:
logger.setLevel(logging.DEBUG)
else:
@@ -363,7 +362,7 @@ def setup_phenotypes(
# account for the fact that genotypes.h5 and phenotype_df can have different
# orders of their samples
self.index_map_geno, _ = get_matched_sample_indices(
- samples_gt.astype(int), self.samples.astype(int)
+ samples_gt.astype(str), self.samples.astype(str)
)
# get_matched_sample_indices is a much, much faster implementation of the code below
# self.index_map_geno = [np.where(samples_gt.astype(int) == i) for i in self.samples.astype(int)]
diff --git a/deeprvat/deeprvat/association_testing_pretrained.snakefile b/deeprvat/deeprvat/association_testing_pretrained.snakefile
deleted file mode 100644
index 3cbbc952..00000000
--- a/deeprvat/deeprvat/association_testing_pretrained.snakefile
+++ /dev/null
@@ -1,211 +0,0 @@
-from pathlib import Path
-from typing import List
-
-configfile: 'config.yaml'
-
-debug_flag = config.get('debug', False)
-
-phenotypes = list(config['phenotypes'].keys())
-n_burden_chunks = config.get('n_burden_chunks', 1) if not debug_flag else 2
-n_regression_chunks = config.get('n_regression_chunks', 40) if not debug_flag else 2
-n_trials = config['hyperparameter_optimization']['n_trials']
-n_repeats = config['n_repeats']
-debug = '--debug ' if debug_flag else ''
-do_scoretest = '--do-scoretest ' if config.get('do_scoretest', False) else ''
-tensor_compression_level = config['training'].get('tensor_compression_level', 1)
-
-wildcard_constraints:
- repeat="\d+"
-
-rule all:
- input:
- "results.parquet",
- "pvals.parquet"
-
-rule evaluate:
- input:
- testing_associations = expand('{{phenotype}}/deeprvat/repeat_{repeat}/results/burden_associations_testing.parquet',
- repeat=range(n_repeats)),
- replication_associations = expand('{{phenotype}}/deeprvat/repeat_{repeat}/results/burden_associations_replication.parquet',
- repeat=range(n_repeats)),
- config = '{phenotype}/deeprvat/hpopt_config.yaml',
- baseline = lambda wildcards: [
- str(Path(r['base']) / wildcards.phenotype / r['type'] /
- 'eval/burden_associations_testing.parquet')
- for r in config['baseline_results']
- ]
- output:
- "results.parquet",
- "pvals.parquet"
- threads: 1
- shell:
- py + 'deeprvat_evaluate '
- + debug +
- '--correction-method FDR '
- '{input.associations} '
- '{input.config} '
- '{wildcards.phenotype}/deeprvat/eval'
-
-rule all_regression:
- input:
- expand('{phenotype}/deeprvat/repeat_{repeat}/results/burden_associations.parquet',
- phenotype=phenotypes, type=['deeprvat'], repeat=range(n_repeats)),
-
-rule combine_regression_chunks:
- input:
- expand('{{phenotype}}/{{type}}/repeat_{{repeat}}/results/burden_associations_{chunk}.parquet', chunk=range(n_regression_chunks)),
- output:
- '{phenotype}/{type}/repeat_{repeat}/results/burden_associations.parquet',
- threads: 1
- shell:
- 'deeprvat_associate combine-regression-results '
- '--model-name repeat_{wildcards.repeat} '
- '{input.testing} '
- '{output.testing}',
- ' && '.join([
- 'univariate_burden_regression.py combine-regression-results '
- '--model-name repeat_{wildcards.repeat} '
- '{input} '
- '{output}'
- ])
-
-rule regress:
- input:
- config = "{phenotype}/deeprvat/hpopt_config.yaml",
- chunks = lambda wildcards: expand(
- ('{{phenotype}}/{{type}}/burdens/chunk{chunk}.' +
- ("finished" if wildcards.phenotype == phenotypes[0] else "linked")),
- chunk=range(n_burden_chunks)
- ),
- phenotype_0_chunks = expand(
- phenotypes[0] + '/{{type}}/burdens/chunk{chunk}.finished',
- chunk=range(n_burden_chunks)
- ),
- output:
- temp('{phenotype}/{type}/repeat_{repeat}/results/burden_associations_{chunk}.parquet'),
- threads: 2
- shell:
- 'deeprvat_associate regress '
- + debug +
- '--chunk {wildcards.chunk} '
- '--n-chunks ' + str(n_regression_chunks) + ' '
- '--use-bias '
- '--repeat {wildcards.repeat} '
- + do_scoretest +
- '{input.config} '
- '{wildcards.phenotype}/{wildcards.type}/burdens ' #TODO make this w/o repeats
- '{wildcards.phenotype}/{wildcards.type}/repeat_{wildcards.repeat}/results'
-
-rule all_burdens:
- input:
- [
- (f'{p}/deeprvat/burdens/chunk{c}.' +
- ("finished" if p == phenotypes[0] else "linked"))
- for p in phenotypes
- for c in range(n_burden_chunks)
- ]
-
-rule link_burdens:
- priority: 1
- input:
- checkpoints = lambda wildcards: [
- f'models/repeat_{repeat}/best/bag_{bag}.ckpt'
- for repeat in range(n_repeats) for bag in range(n_bags)
- ],
- dataset = '{phenotype}/deeprvat/association_dataset.pkl',
- config = 'models/repeat_0/config.yaml' #TODO make this more generic
- output:
- '{phenotype}/deeprvat/burdens/chunk{chunk}.linked'
- threads: 8
- shell:
- ' && '.join([
- ('deeprvat_associate compute-burdens '
- + debug +
- ' --n-chunks '+ str(n_burden_chunks) + ' '
- f'--link-burdens ../../../{phenotypes[0]}/deeprvat/burdens/burdens.zarr '
- '--chunk {wildcards.chunk} '
- '--dataset-file {input.dataset} '
- '{input.config} '
- '{input.checkpoints} '
- '{wildcards.phenotype}/deeprvat/burdens'),
- 'touch {output}'
- ])
-
-rule compute_burdens:
- priority: 10
- input:
- checkpoints = lambda wildcards: [
- f'models/repeat_{repeat}/best/bag_{bag}.ckpt'
- for repeat in range(n_repeats) for bag in range(n_bags)
- ],
- dataset = '{phenotype}/deeprvat/association_dataset.pkl',
- config = 'models/repeat_0/config.yaml' #TODO make this more generic
- output:
- '{phenotype}/deeprvat/burdens/chunk{chunk}.finished'
- threads: 8
- shell:
- ' && '.join([
- ('deeprvat_associate compute-burdens '
- + debug +
- ' --n-chunks '+ str(n_burden_chunks) + ' '
- '--chunk {wildcards.chunk} '
- '--dataset-file {input.dataset} '
- '{input.config} '
- '{input.checkpoints} '
- '{wildcards.phenotype}/deeprvat/burdens'),
- 'touch {output}'
- ])
-
-rule all_association_dataset:
- input:
- expand('{phenotype}/deeprvat/association_dataset.pkl',
- phenotype=phenotypes)
-
-rule association_dataset:
- input:
- config = '{phenotype}/deeprvat/hpopt_config.yaml'
- output:
- '{phenotype}/deeprvat/association_dataset.pkl'
- threads: 4
- shell:
- 'deeprvat_associate make-dataset '
- + debug +
- '{input.config} '
- '{output}'
-
-rule reverse_models:
- input:
- checkpoints = lambda wildcards: [
- f'{training_phenotypes[wildcards.phenotype]}/deeprvat/repeat_{repeat}/models/best/bag_{bag}.ckpt'
- for repeat in range(n_repeats) for bag in range(n_bags)
- ],
- config = '{phenotype}/deeprvat/repeat_0/config.yaml',
- output:
- "{phenotype}/deeprvat/reverse_finished.tmp"
- threads: 4
- shell:
- " && ".join([
- ("deeprvat_associate reverse-models "
- "{input.config} "
- "{input.checkpoints}"),
- "touch {output}"
- ])
-
-rule all_config:
- input:
- config = expand('{phenotype}/deeprvat/hpopt_config.yaml',
- phenotype=phenotypes),
-
-rule config:
- input:
- config = 'config.yaml',
- output:
- config = '{phenotype}/deeprvat/hpopt_config.yaml',
- threads: 1
- shell:
- (
- 'deeprvat_config update-config '
- '--phenotype {wildcards.phenotype} '
- '{input.config} '
- '{output.config}'
- )
diff --git a/deeprvat/preprocessing/preprocess.py b/deeprvat/preprocessing/preprocess.py
index 46eba386..71745c13 100644
--- a/deeprvat/preprocessing/preprocess.py
+++ b/deeprvat/preprocessing/preprocess.py
@@ -2,6 +2,7 @@
import logging
import sys
import time
+from contextlib import ExitStack
from pathlib import Path
from typing import List, Optional, Tuple
@@ -9,7 +10,6 @@
import h5py
import numpy as np
import pandas as pd
-from joblib import Parallel, delayed
from tqdm import tqdm, trange
logging.basicConfig(
@@ -26,8 +26,11 @@ def drop_rows(df: pd.DataFrame, df_to_drop: pd.DataFrame) -> pd.DataFrame:
return df.drop_duplicates(keep=False)
-def ragged_to_matrix(rows: List[np.ndarray], pad_value: int = -1) -> np.ndarray:
- max_len = max([r.shape[0] for r in rows])
+def ragged_to_matrix(
+ rows: List[np.ndarray], pad_value: int = -1, max_len: Optional[int] = None
+) -> np.ndarray:
+ if max_len is None:
+ max_len = max([r.shape[0] for r in rows])
matrix = np.stack(
[
np.pad(
@@ -36,7 +39,7 @@ def ragged_to_matrix(rows: List[np.ndarray], pad_value: int = -1) -> np.ndarray:
"constant",
constant_values=(pad_value, pad_value),
)
- for r in tqdm(rows)
+ for r in tqdm(rows, desc="Ragged to matrix", file=sys.stdout)
]
)
return matrix
@@ -56,7 +59,8 @@ def process_sparse_gt_file(
)
sparse_gt = sparse_gt[sparse_gt["sample"].isin(samples)]
- sparse_gt = drop_rows(sparse_gt, calls_to_exclude)
+ if calls_to_exclude is not None:
+ sparse_gt = drop_rows(sparse_gt, calls_to_exclude)
try:
sparse_gt = pd.merge(sparse_gt, variants, validate="m:1")
@@ -97,10 +101,13 @@ def write_genotype_file(
samples: np.ndarray,
variant_matrix: np.ndarray,
genotype_matrix: np.ndarray,
+ count_variants: Optional[np.ndarray] = None,
):
f.create_dataset("samples", data=samples, dtype=h5py.string_dtype())
f.create_dataset("variant_matrix", data=variant_matrix, dtype=np.int32)
f.create_dataset("genotype_matrix", data=genotype_matrix, dtype=np.int8)
+ if count_variants is not None:
+ f.create_dataset("count_variants", data=count_variants, dtype=np.int32)
@click.group()
@@ -112,18 +119,29 @@ def cli():
@click.argument("variant_file", type=click.Path(exists=True))
@click.argument("out_file", type=click.Path(writable=True))
@click.argument("duplicates_file", type=click.Path(writable=True))
-def add_variant_ids(variant_file: str, out_file: str, duplicates_file: str):
+@click.option("--chromosomes", type=str)
+def add_variant_ids(
+ variant_file: str,
+ out_file: str,
+ duplicates_file: str,
+ chromosomes: Optional[str] = None,
+):
variants = pd.read_csv(
variant_file, sep="\t", names=["chrom", "pos", "ref", "alt"], index_col=False
)
+ if chromosomes is not None:
+ logging.info(f"Filtering variants based on chromosomes. Keeping {chromosomes}")
+ chromosomes = [f"chr{chrom}" for chrom in chromosomes.split(",")]
+ variants = variants[variants["chrom"].isin(chromosomes)]
+
duplicates = variants[variants.duplicated()]
if Path(duplicates_file).suffix == ".parquet":
- logging.info(f"Writing duplicates in parquet format")
+ logging.info("Writing duplicates in parquet format")
duplicates.to_parquet(duplicates_file, index=False)
else:
- logging.info(f"Writing duplicates in tsv format")
+ logging.info("Writing duplicates in tsv format")
duplicates.to_csv(duplicates_file, sep="\t", header=False, index=False)
logging.info(f"Wrote {len(duplicates)} duplicates to {duplicates_file}")
@@ -133,10 +151,10 @@ def add_variant_ids(variant_file: str, out_file: str, duplicates_file: str):
variants["id"] = range(len(variants))
if Path(out_file).suffix == ".parquet":
- logging.info(f"Writing duplicates in parquet format")
+ logging.info("Writing duplicates in parquet format")
variants.to_parquet(out_file, index=False)
else:
- logging.info(f"Writing duplicates in tsv format")
+ logging.info("Writing duplicates in tsv format")
variants.to_csv(out_file, sep="\t", index=False)
logging.info(
@@ -157,50 +175,95 @@ def get_file_chromosome(file, col_names, chrom_field="chrom"):
return chrom
+def parse_file_path_list(file_path_list_path: Path):
+ with open(file_path_list_path) as file:
+ vcf_files = [Path(line.rstrip()) for line in file]
+ vcf_stems = [vf.stem.replace(".vcf", "") for vf in vcf_files]
+
+ assert len(vcf_stems) == len(vcf_files)
+
+ vcf_look_up = {stem: file for stem, file in zip(vcf_stems, vcf_files)}
+
+ return vcf_stems, vcf_files, vcf_look_up
+
+
@cli.command()
+@click.option("--threshold", type=float, default=0.1)
+@click.argument("file-paths-list", type=click.Path(exists=True))
+@click.argument("imiss-dir", type=click.Path(exists=True))
+@click.argument("out-file", type=click.Path())
+def process_individual_missingness(
+ threshold: float, file_paths_list: Path, imiss_dir: str, out_file: str
+):
+ vcf_stems, _, _ = parse_file_path_list(file_paths_list)
+
+ imiss_dir = Path(imiss_dir)
+
+ imiss_blocks = []
+ total_variants = 0
+ for vcf_stem in tqdm(vcf_stems, desc="VCFs"):
+ missing_counts = pd.read_csv(
+ imiss_dir / "samples" / f"{vcf_stem}.tsv",
+ sep="\t",
+ header=None,
+ usecols=[1, 11],
+ )
+ missing_counts.columns = ["sample", "n_missing"]
+ imiss_blocks.append(missing_counts)
+ total_variants += pd.read_csv(
+ imiss_dir / "sites" / f"{vcf_stem}.tsv",
+ header=None,
+ sep="\t",
+ ).iloc[0, 1]
+
+ imiss = pd.concat(imiss_blocks, ignore_index=True)
+ sample_groups = imiss.groupby("sample")
+ sample_counts = sample_groups.agg(np.sum).reset_index()
+ sample_counts["missingness"] = sample_counts["n_missing"] / total_variants
+ sample_counts = sample_counts.loc[
+ sample_counts["missingness"] >= threshold, ["sample", "missingness"]
+ ]
+ sample_counts[["sample"]].to_csv(out_file, index=False, header=None)
+
+
+@cli.command()
+@click.option("--chunksize", type=int, default=1000)
@click.option("--exclude-variants", type=click.Path(exists=True), multiple=True)
@click.option("--exclude-samples", type=click.Path(exists=True))
@click.option("--exclude-calls", type=click.Path(exists=True))
@click.option("--chromosomes", type=str)
-@click.option("--threads", type=int, default=1)
@click.option("--skip-sanity-checks", is_flag=True)
@click.argument("variant-file", type=click.Path(exists=True))
-@click.argument("samples", type=click.Path(exists=True))
+@click.argument("samples-path", type=click.Path(exists=True))
@click.argument("sparse-gt", type=click.Path(exists=True))
@click.argument("out-file", type=click.Path())
def process_sparse_gt(
+ chunksize: int,
exclude_variants: List[str],
exclude_samples: Optional[str],
exclude_calls: Optional[str],
chromosomes: Optional[str],
- threads: int,
skip_sanity_checks: bool,
variant_file: str,
- samples: str,
+ samples_path: str,
sparse_gt: str,
out_file: str,
):
- logging.info("Reading variants...")
+ logging.info("Reading and processing variants...")
start_time = time.time()
-
- variants = pd.read_table(variant_file, engine="pyarrow")
-
- # Filter all variants based on chromosome
+ variants = pd.read_parquet(variant_file, engine="pyarrow")
if chromosomes is not None:
chromosomes = [f"chr{chrom}" for chrom in chromosomes.split(",")]
variants = variants[variants["chrom"].isin(chromosomes)]
total_variants = len(variants)
-
if len(exclude_variants) > 0:
variant_exclusion_files = [
v for directory in exclude_variants for v in Path(directory).rglob("*.tsv*")
]
-
- exclusion_file_cols = ["chrom", "pos", "ref", "alt"]
variants_to_exclude = pd.concat(
[
- pd.read_csv(v, sep="\t", names=exclusion_file_cols)
- for v in tqdm(variant_exclusion_files)
+ pd.read_csv(v, sep="\t", names=["chrom", "pos", "ref", "alt"])
+ for v in tqdm(variant_exclusion_files, file=sys.stdout)
],
ignore_index=True,
)
@@ -214,76 +277,55 @@ def process_sparse_gt(
)["id"]
variants = variants[~variants["id"].isin(variant_ids_to_exclude)]
if not skip_sanity_checks:
- try:
- assert total_variants - len(variants) == len(variants_to_exclude)
- except:
- import ipdb
+ assert total_variants - len(variants) == len(variants_to_exclude)
- ipdb.set_trace()
logging.info(f"Dropped {total_variants - len(variants)} variants")
logging.info(f"...done ({time.time() - start_time} s)")
logging.info("Processing samples")
- samples = set(pd.read_csv(samples, header=None).loc[:, 0])
+ samples = set(pd.read_csv(samples_path, header=None).loc[:, 0])
if exclude_samples is not None:
total_samples = len(samples)
if sample_exclusion_files := list(Path(exclude_samples).rglob("*.csv")):
- samples_to_exclude = set(
- pd.concat(
- [
- pd.read_csv(s, header=None).loc[:, 0]
- for s in sample_exclusion_files
- ],
- ignore_index=True,
+
+ sample_exclusion_files = [
+ s for s in sample_exclusion_files if s.stat().st_size > 0
+ ]
+ if sample_exclusion_files:
+ logging.info(
+ f"Found {len(sample_exclusion_files)} sample exclusion files"
)
- )
+ samples_to_exclude = set(
+ pd.concat(
+ [
+ pd.read_csv(s, header=None).loc[:, 0]
+ for s in sample_exclusion_files
+ ],
+ ignore_index=True,
+ )
+ )
+ else:
+ samples_to_exclude = set()
samples -= samples_to_exclude
logging.info(f"Dropped {total_samples - len(samples)} samples")
else:
logging.info(f"Found no samples to exclude in {exclude_samples}")
- samples = list(samples)
+ samples = sorted(list(samples))
logging.info("Processing sparse GT files by chromosome")
total_calls_dropped = 0
variant_groups = variants.groupby("chrom")
logger.info(f"variant groups are {variant_groups.groups.keys()}")
- for chrom in tqdm(variant_groups.groups.keys()):
+ for chrom in tqdm(
+ variant_groups.groups.keys(), desc="Chromosomes", file=sys.stdout
+ ):
logging.info(f"Processing chromosome {chrom}")
logging.info("Reading in filtered calls")
- exclude_calls_file_cols = ["chrom", "pos", "ref", "alt", "sample"]
-
- calls_to_exclude = pd.DataFrame(columns=exclude_calls_file_cols)
-
if exclude_calls is not None:
exclude_calls_chrom = Path(exclude_calls).rglob("*.tsv*")
- exclude_calls_chrom_files = []
- for exclude_call_file in exclude_calls_chrom:
- file_chrom = get_file_chromosome(
- exclude_call_file, col_names=exclude_calls_file_cols
- )
-
- if file_chrom == chrom:
- exclude_calls_chrom_files.append(exclude_call_file)
-
- if exclude_calls_chrom_files:
- calls_to_exclude = pd.concat(
- [
- pd.read_csv(
- c,
- names=exclude_calls_file_cols,
- sep="\t",
- index_col=None,
- )
- for c in tqdm(exclude_calls_chrom_files)
- ],
- ignore_index=True,
- )
- calls_dropped = len(calls_to_exclude)
- total_calls_dropped += calls_dropped
- logging.info(f"Dropped {calls_dropped} calls")
logging.info("Processing sparse GT files")
@@ -297,83 +339,185 @@ def process_sparse_gt(
if get_file_chromosome(f, col_names=sparse_file_cols) == chrom
]
- logging.info(f"sparse gt chrom(es) are: {sparse_gt_chrom}")
-
- processed = Parallel(n_jobs=threads, verbose=50)(
- delayed(process_sparse_gt_file)(
- f.as_posix(), variants_chrom, samples, calls_to_exclude
- )
- for f in sparse_gt_chrom
+ logging.info(
+ f"{len(sparse_gt_chrom)} sparse GT files: {[str(s) for s in sparse_gt_chrom]}"
)
- postprocessed_gt = postprocess_sparse_gt(processed, 1, len(samples))
- postprocessed_variants = postprocess_sparse_gt(processed, 0, len(samples))
+ variant_ids = [np.array([], dtype=np.int32) for _ in samples]
+ genotypes = [np.array([], dtype=np.int8) for _ in samples]
+ for sparse_gt_file in tqdm(sparse_gt_chrom, desc="Files", file=sys.stdout):
+ # Load calls to exclude that correspond to current file
+ file_stem = sparse_gt_file
+ while str(file_stem).find(".") != -1:
+ file_stem = Path(file_stem.stem)
+
+ calls_to_exclude = None
+ if exclude_calls is not None:
+ exclude_call_files = [
+ f for f in exclude_calls_chrom if f.name.startswith(str(file_stem))
+ ]
+ if len(exclude_call_files) > 0:
+ calls_to_exclude = pd.concat(
+ [
+ pd.read_csv(
+ c,
+ names=["chrom", "pos", "ref", "alt", "sample"],
+ sep="\t",
+ index_col=None,
+ )
+ for c in tqdm(
+ exclude_call_files,
+ desc="Filtered calls",
+ file=sys.stdout,
+ )
+ ],
+ ignore_index=True,
+ )
+
+ calls_dropped = len(calls_to_exclude)
+ total_calls_dropped += calls_dropped
+ logging.info(f"Dropped {calls_dropped} calls")
+
+ # Load sparse_gt_file as data frame, add variant IDs, remove excluded calls
+ this_variant_ids, this_genotypes = process_sparse_gt_file(
+ sparse_gt_file.as_posix(), variants_chrom, samples, calls_to_exclude
+ )
+ assert len(this_variant_ids) == len(samples)
+ assert len(this_genotypes) == len(samples)
- logging.info("Ordering gt and variant matrices")
- order = [np.argsort(i) for i in postprocessed_variants]
+ # Concatenate to existing results
+ for i, (v, g) in enumerate(zip(this_variant_ids, this_genotypes)):
+ variant_ids[i] = np.append(variant_ids[i], v)
+ genotypes[i] = np.append(genotypes[i], g)
- postprocessed_variants = [
- postprocessed_variants[i][order[i]]
- for i in range(len(postprocessed_variants))
- ]
- postprocessed_gt = [
- postprocessed_gt[i][order[i]] for i in range(len(postprocessed_gt))
- ]
+ gc.collect()
- logging.info("Preparing GT arrays for storage")
-
- logger.info("padding ragged matrix")
- variant_matrix = ragged_to_matrix(postprocessed_variants)
- gt_matrix = ragged_to_matrix(postprocessed_gt)
+ gc.collect()
out_file_chrom = f"{out_file}_{chrom}.h5"
Path(out_file_chrom).parents[0].mkdir(exist_ok=True, parents=True)
logging.info(f"Writing to {out_file_chrom}")
+ count_variants = np.array([g.shape[0] for g in genotypes])
+ max_len = np.max(count_variants)
with h5py.File(out_file_chrom, "w") as f:
sample_array = np.array(samples, dtype="S")
- write_genotype_file(f, sample_array, variant_matrix, gt_matrix)
+ f.create_dataset("samples", data=sample_array, dtype=h5py.string_dtype())
+ f.create_dataset("variant_matrix", (len(samples), max_len), dtype=np.int32)
+ f.create_dataset("genotype_matrix", (len(samples), max_len), dtype=np.int8)
+ f.create_dataset("count_variants", data=count_variants, dtype=np.int32)
+
+ # Operate in chunks to reduce memory usage
+ for start_sample in trange(
+ 0, len(samples), chunksize, desc="Chunks", file=sys.stdout
+ ):
+ end_sample = min(start_sample + chunksize, len(samples))
+
+ order = [np.argsort(v) for v in variant_ids[start_sample:end_sample]]
+
+ this_variant_ids = [
+ variant_ids[i][order[i - start_sample]]
+ for i in range(start_sample, end_sample)
+ ]
+ this_genotypes = [
+ genotypes[i][order[i - start_sample]]
+ for i in range(start_sample, end_sample)
+ ]
+
+ gc.collect()
+
+ variant_matrix = ragged_to_matrix(this_variant_ids, max_len=max_len)
+ gt_matrix = ragged_to_matrix(this_genotypes, max_len=max_len)
+
+ f["variant_matrix"][start_sample:end_sample] = variant_matrix
+ f["genotype_matrix"][start_sample:end_sample] = gt_matrix
+
+ del variant_matrix
+ del gt_matrix
+ gc.collect()
logging.info(f"Dropped {total_calls_dropped} calls in total")
logging.info("Finished!")
@cli.command()
+@click.option("--chunksize", type=int)
@click.argument("genotype-files", nargs=-1, type=click.Path(exists=True))
@click.argument("out-file", type=click.Path())
-def combine_genotypes(genotype_files: List[str], out_file: str):
+def combine_genotypes(
+ chunksize: Optional[int], genotype_files: List[str], out_file: str
+):
with h5py.File(genotype_files[0]) as f:
samples = f["samples"][:]
- n_samples = samples.shape[0]
- variant_matrix_list: List[List] = [[] for _ in range(n_samples)]
- genotype_matrix_list: List[List] = [[] for _ in range(n_samples)]
- mat_idx_offset = 0
- for file in tqdm(genotype_files, desc="Files"):
+ for gt_file in genotype_files:
+ with h5py.File(gt_file) as f:
+ if not np.array_equal(samples, f["samples"][:]):
+ raise ValueError(
+ f"Error when processing {gt_file}: "
+ "All genotype files must contain the same samples in the same order"
+ )
+
+ n_samples: int = samples.shape[0]
+
+ if chunksize is None:
+ chunksize = n_samples
+
+ count_variants = np.zeros(n_samples, dtype=np.int32)
+ for file in genotype_files:
with h5py.File(file) as f:
- var_matrix = f["variant_matrix"][:]
- gt_matrix = f["genotype_matrix"][:]
- for i in trange(n_samples, desc="Samples"):
- this_var = var_matrix[i, var_matrix[i, :] != -1]
- this_gt = gt_matrix[i, gt_matrix[i, :] != -1]
- assert this_var.shape == this_gt.shape
-
- variant_matrix_list[i].append(this_var + mat_idx_offset)
- genotype_matrix_list[i].append(this_gt)
-
- ragged_variant_matrix = [np.concatenate(vm) for vm in tqdm(variant_matrix_list)]
- del variant_matrix_list
- gc.collect()
- ragged_genotype_matrix = [np.concatenate(gt) for gt in tqdm(genotype_matrix_list)]
- del genotype_matrix_list
- gc.collect()
- variant_matrix = ragged_to_matrix(ragged_variant_matrix)
- del ragged_variant_matrix
- gc.collect()
- genotype_matrix = ragged_to_matrix(ragged_genotype_matrix)
- del ragged_genotype_matrix
- gc.collect()
- with h5py.File(out_file, "a") as f:
- write_genotype_file(f, samples, variant_matrix, genotype_matrix)
+ count_variants += f["count_variants"][:]
+
+ max_n_variants = int(np.max(count_variants))
+
+ with h5py.File(out_file, "w") as f:
+ f.create_dataset("samples", data=samples, dtype=h5py.string_dtype())
+ f.create_dataset("variant_matrix", (n_samples, max_n_variants), dtype=np.int32)
+ f.create_dataset("genotype_matrix", (n_samples, max_n_variants), dtype=np.int8)
+
+ with h5py.File(out_file, "r+") as g:
+ for start_sample in trange(
+ 0, n_samples, chunksize, desc="Chunks", file=sys.stdout
+ ):
+ end_sample = min(start_sample + chunksize, n_samples)
+ this_chunksize = end_sample - start_sample
+
+ # Use ExitStack so we don't have to reopen every HDF5 file for each sample
+ variant_matrix_list = [[] for _ in range(this_chunksize)]
+ genotype_matrix_list = [[] for _ in range(this_chunksize)]
+ with ExitStack() as stack:
+ genotype_file_objs = [
+ stack.enter_context(h5py.File(g)) for g in genotype_files
+ ]
+ for f in tqdm(genotype_file_objs, desc="Files", file=sys.stdout):
+ var_matrix = f["variant_matrix"][start_sample:end_sample, :]
+ gt_matrix = f["genotype_matrix"][start_sample:end_sample, :]
+ for i in trange(this_chunksize, desc="Samples", file=sys.stdout):
+ this_var = var_matrix[i, var_matrix[i, :] != -1]
+ this_gt = gt_matrix[i, gt_matrix[i, :] != -1]
+ assert this_var.shape == this_gt.shape
+
+ variant_matrix_list[i].append(this_var)
+ genotype_matrix_list[i].append(this_gt)
+
+ ragged_variant_matrix = [np.concatenate(vm) for vm in variant_matrix_list]
+ del variant_matrix_list
+ gc.collect()
+ ragged_genotype_matrix = [np.concatenate(gt) for gt in genotype_matrix_list]
+ del genotype_matrix_list
+ gc.collect()
+ variant_matrix = ragged_to_matrix(
+ ragged_variant_matrix, max_len=max_n_variants
+ )
+ del ragged_variant_matrix
+ gc.collect()
+ genotype_matrix = ragged_to_matrix(
+ ragged_genotype_matrix, max_len=max_n_variants
+ )
+ del ragged_genotype_matrix
+ gc.collect()
+
+ g["variant_matrix"][start_sample:end_sample, :] = variant_matrix
+ g["genotype_matrix"][start_sample:end_sample, :] = genotype_matrix
if __name__ == "__main__":
diff --git a/deeprvat/seed_gene_discovery/lsf.yaml b/deeprvat/seed_gene_discovery/lsf.yaml
deleted file mode 100644
index 808f07e8..00000000
--- a/deeprvat/seed_gene_discovery/lsf.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-__default__:
- - "-q medium"
-
-config:
- - "-q short"
-
-plof_config:
- - "-q short"
-
-# training_dataset_pickle:
-# - "-q short"
-
-delete_burden_cache:
- - "-q short"
-
-best_cv_run:
- - "-q short"
-
-train_cv:
- - "-q gpu"
- - "-gpu num=1:j_exclusive=yes:mode=exclusive_process:gmem=15.7G"
- # - "-R tensorcore"
- # - "-L /bin/bash"
- # - >-
- # -R "select[hname!='e230-dgx2-1' && hname!='e230-dgx2-2']"
-
-compute_burdens:
- - "-q gpu"
- - "-gpu num=1:j_exclusive=yes:mode=exclusive_process:gmem=15.7G"
- # - "-R tensorcore"
- # - "-L /bin/bash"
- # - >-
- # -R "select[hname!='e230-dgx2-1' && hname!='e230-dgx2-2']"
-
-compute_plof_burdens:
- - "-q medium"
-
-sample_splits:
- - "-q medium"
-
-regress:
- - "-q long"
-
-combine_regression_chunks:
- - "-q short"
-
-# plof_dataset:
-# - "-q short"
diff --git a/deeprvat_annotations.yml b/deeprvat_annotations.yml
index 0c4b6896..47f6e1f5 100644
--- a/deeprvat_annotations.yml
+++ b/deeprvat_annotations.yml
@@ -16,7 +16,10 @@ dependencies:
- tensorflow=2.11.0
- pyarrow=11.0.0
- fastparquet=2023.4.0
+ - bioconda::pyranges=0.0.129
+ - pytest=8.2.0
+ - pytest-xdist=3.5.0
#comment out lines below if you want to use preinstalled bcftools or samtools
- bcftools=1.17
- samtools=1.17
- - ensembl-vep=110.1
\ No newline at end of file
+ - ensembl-vep=110.1
diff --git a/deeprvat_env_no_gpu.yml b/deeprvat_env_no_gpu.yml
index 31fb7d70..54a16d4a 100644
--- a/deeprvat_env_no_gpu.yml
+++ b/deeprvat_env_no_gpu.yml
@@ -9,6 +9,7 @@ dependencies:
- dask=2023.5
- fastparquet=0.5
- h5py=3.1
+ - mkl!=2024.1.0
- numcodecs=0.11
- numpy=1.21
- optuna=2.10
@@ -32,6 +33,8 @@ dependencies:
- parallel=20230922
- pip=23.1.2
- plotnine=0.10.1
+ - pytest=8.2.0
+ - pytest-xdist=3.5.0
- pip:
- git+https://github.com/HealthML/seak@v0.4.3
- bgen==1.6.3
diff --git a/deeprvat_preprocessing_env.yml b/deeprvat_preprocessing_env.yml
index bbf17e8b..3b3a2758 100644
--- a/deeprvat_preprocessing_env.yml
+++ b/deeprvat_preprocessing_env.yml
@@ -6,11 +6,10 @@ dependencies:
- python=3.8
- click=8.1.3
- h5py=3.8.0
- - joblib=1.0.1
- numpy=1.21.2
- pandas=1.5.1
- tqdm=4.59.0
- - pytest=7.3.1
+ - pytest=8.2.0
- pyarrow=11.0.0
- snakemake=7.17.1
- bcftools=1.17
diff --git a/docs/Makefile b/docs/Makefile
index d4bb2cbb..70d18002 100644
--- a/docs/Makefile
+++ b/docs/Makefile
@@ -14,6 +14,14 @@ help:
.PHONY: help Makefile
+
+
+.PHONY: check-codestyle
+generate-doc-graphs:
+ bash generate-doc-graphs.sh
+
+
+
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
diff --git a/docs/_static/annotation_rulegraph.svg b/docs/_static/annotation_rulegraph.svg
new file mode 100644
index 00000000..7366a07c
--- /dev/null
+++ b/docs/_static/annotation_rulegraph.svg
@@ -0,0 +1,263 @@
+
+
+
+
+
diff --git a/docs/_static/preprocess_rulegraph_no_qc.svg b/docs/_static/preprocess_no_qc_rulegraph.svg
similarity index 65%
rename from docs/_static/preprocess_rulegraph_no_qc.svg
rename to docs/_static/preprocess_no_qc_rulegraph.svg
index 3edc0a0e..c631689f 100644
--- a/docs/_static/preprocess_rulegraph_no_qc.svg
+++ b/docs/_static/preprocess_no_qc_rulegraph.svg
@@ -1,7 +1,7 @@
-