diff --git a/Cargo.toml b/Cargo.toml index 9aab392..047ff6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,15 +26,12 @@ clap = { version = "4.5.16", features = ["derive"] } [[bin]] -name = "fava" +name = "favabean" path = "src/main.rs" -[[bin]] -name = "bam2bed" -path = "src/bam2bed/main.rs" - - - +# [[bin]] +# name = "bam2bed" +# path = "src/bam2bed/main.rs" # anyhow = "1.0.86" # clap = { version = "4.5.16", features = ["derive"] } diff --git a/README.md b/README.md index 5e844bb..7a2b8d7 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,10 @@ -# FAVA BEAN +# FAVA BEANS + +Filtering Alignments to expedite Bayesian Analysis in Base-pair-resolution Editing site Aggregation and Sifting + +Filtering Alignments to expedite Variant Associations in Base-pair-resolution Editing site Aggregation and Sifting + + + -Filtering Accessibility with Variant Alignment for Bayesian Epitranscriptomic Analysis diff --git a/src/aggregate.rs b/src/aggregate.rs new file mode 100644 index 0000000..74e51cc --- /dev/null +++ b/src/aggregate.rs @@ -0,0 +1,217 @@ +use super::AggregateArgs; +use anyhow; +use rayon::prelude::*; +use rust_htslib::bam::ext::BamRecordExtensions; +use rust_htslib::bam::{self, Read}; +use std::cmp::{max, min}; +use std::sync::{Arc, Mutex}; +use std::{str, thread}; + +use crate::util::check_bam_index; + +pub fn run_aggregate(args: &AggregateArgs) -> anyhow::Result<()> { + // Visit all the alignments and figure out + + let nthread_max = thread::available_parallelism() + .expect("failed to figure out number of cores") + .get(); + + let nthread = match args.threads { + Some(x) => min(nthread_max, x), + None => nthread_max, + }; + + let (bam_file_bg, bam_file_fg) = (args.bg_bam.as_ref(), args.fg_bam.as_ref()); + + check_bam_index(bam_file_bg, args.bg_bai.as_deref()) + .expect("check index for the background BAM"); + check_bam_index(bam_file_fg, args.fg_bai.as_deref()) + .expect("check index for the foreground BAM"); + + let block_size = match args.bsize { + Some(bs) => bs, + _ => 10_000, + } as i64; + + let mut jobs = vec![]; + + let br = bam::Reader::from_path(bam_file_fg)?; + let hdr = br.header(); + + for (tid, name) in hdr.target_names().iter().enumerate() { + let max_size = hdr.target_len(tid as u32).unwrap() as i64; + let chr_name = Box::new(str::from_utf8(name).unwrap()); + jobs.push((chr_name, make_blocks(max_size, block_size))); + } + + // shared index reader + let arc_bam_bg = Arc::new(Mutex::new(bam::IndexedReader::from_path(bam_file_bg)?)); + + let arc_bam_fg = Arc::new(Mutex::new(bam::IndexedReader::from_path(bam_file_fg)?)); + + rayon::ThreadPoolBuilder::new() + .num_threads(nthread as usize) + .build_global() + .unwrap(); + + for (chr, blocks) in jobs.iter() { + // + let chr_name = *(chr.as_ref()); + + dbg!(chr_name); + + let _ = blocks.par_iter().map(|(lb, ub)| { + let region = (chr_name, *lb, *ub); + let bg = get_dna_freq(&arc_bam_bg, region); + let fg = get_dna_freq(&arc_bam_fg, region); + match (bg, fg) { + (Ok(freq_bg), Ok(freq_fg)) => { + freq_bg.forward; + freq_bg.reverse; + freq_fg.forward; + freq_fg.reverse; + // + } + _ => { + // do nothing --> ignore errors + } + } + }); + + // blocks.par_bridge(); + } + + Ok(()) +} + +fn make_blocks(max_size: i64, block_size: i64) -> Vec<(i64, i64)> { + let mut jobs = vec![]; + for lb in (0..max_size).step_by(block_size as usize) { + let ub = min(max_size, lb + block_size); + jobs.push((lb, ub)); + } + return jobs; +} + +/////////////////////////// +// DNA frequency vectors // +/////////////////////////// + +#[derive(Debug)] +struct DnaFreq { + a: usize, // number of A's + t: usize, // number of T's + g: usize, // number of G's + c: usize, // number of C's + tot: usize, // total + gpos: i64, // genomic position +} + +struct DnaFreqVecs { + forward: Vec, + reverse: Vec, +} + +//////////////////////////////////////////// +// Extract DNA base pair frequency tables // +//////////////////////////////////////////// + +fn get_dna_freq( + arc_bam: &Arc>, + region: (&str, i64, i64), +) -> anyhow::Result { + let (_, lb, ub) = region; + + let mut bam_reader = arc_bam.lock().expect("unable to lock the reader"); + + bam_reader + .fetch(region) + .expect("unable to fetch the region"); + + if lb >= ub { + return Err(anyhow::anyhow!("lb >= ub")); + } + + let nn = max(ub - lb, 0i64) as usize; + let mut reverse_freq = Vec::with_capacity(nn); + let mut forward_freq = Vec::with_capacity(nn); + + for g in lb..ub { + forward_freq.push(DnaFreq { + a: 0, + t: 0, + g: 0, + c: 0, + tot: 0, + gpos: g, + }); + reverse_freq.push(DnaFreq { + a: 0, + t: 0, + g: 0, + c: 0, + tot: 0, + gpos: g, + }); + } + + // Iter aligned read and reference positions on a basepair level + // https://docs.rs/rust-htslib/latest/src/rust_htslib/bam/ext.rs.html#135 + // [read_pos, genome_pos] + + for rr in bam_reader.rc_records() { + match rr { + Ok(rec) => { + if rec.is_duplicate() { + continue; + } + + // TODO: cell barcode umi + // extract 10x cell barcode + // if let Ok(cb) = rec.aux(b"CB") { + // dbg!(x); + // } + + // extract 10x UMI barcode + // if let Ok(umi) = rec.aux(b"UB") { + // dbg!(x); + // } + + let seq = rec.seq().as_bytes(); + + for [rpos, gpos] in rec.aligned_pairs() { + let (r, g, v) = (rpos as usize, gpos as usize, gpos - lb); + + if g < (lb as usize) || g >= (ub as usize) || v < 0 { + continue; + } + + let bp = seq[r]; + + let freq = match rec.is_reverse() { + true => &mut reverse_freq[v as usize], + _ => &mut forward_freq[v as usize], + }; + + debug_assert_eq!(freq.gpos, gpos); + freq.tot += 1; + match bp { + b'A' | b'a' => freq.a += 1, + b'T' | b't' => freq.t += 1, + b'G' | b'g' => freq.g += 1, + b'C' | b'c' => freq.c += 1, + _ => (), + } + } + } + _ => { + // report error message? + } + } + } + + Ok(DnaFreqVecs { + forward: forward_freq, + reverse: reverse_freq, + }) +} diff --git a/src/main.rs b/src/main.rs index fad87ed..afcb03e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,79 @@ +mod aggregate; +mod util; + +use aggregate::run_aggregate; + +use anyhow; +// use anyhow::{self}; use clap::{Args, Parser, Subcommand}; +#[derive(Parser)] +#[command(version, about, long_about=None)] +#[command(propagate_version = true)] +struct Cli { + #[command(subcommand)] + commands: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// aggregate putative editing sites + Aggregate(AggregateArgs), + /// sifting through potential sites + Sift, +} + +#[derive(Args)] +struct SiftArgs { + /// GFF file + #[arg(short, long)] + gff: Box, +} + +#[derive(Args)] +struct AggregateArgs { + /// foreground BAM file + #[arg(short, long)] + fg_bam: Box, + + /// background BAM file + #[arg(short, long)] + bg_bam: Box, + + /// foreground BAI file (default: .bai) + #[arg(long)] + fg_bai: Option>, + + /// background BAI file (default: .bai) + #[arg(long)] + bg_bai: Option>, + + /// number of threads + #[arg(short, long)] + threads: Option, + + /// block size (default: 10000) + #[arg(long)] + bsize: Option, + + /// output file header + #[arg(short, long)] + output: Option>, +} + /// main CLI for FAVA -fn main() { - // +/// +fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + + match &cli.commands { + Commands::Aggregate(args) => { + // + run_aggregate(args)?; + } + _ => { + todo!("sifting"); + } + } + Ok(()) } diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..3a0d904 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,39 @@ +use rust_htslib::bam; +use std::thread; +use std::path::Path; + +pub fn check_bam_index( + bam_file_name: &str, + idx_file_name: Option<&str>, +) -> anyhow::Result> { + // log::info!("Checking BAM index"); + + let idx_file = match idx_file_name { + Some(x) => String::from(x), + None => format!("{}.bai", bam_file_name), + }; + + if Path::new(&idx_file).exists() { + return Ok(idx_file.into_boxed_str()); + } + + let ncore = thread::available_parallelism() + .expect("failed to figure out number of cores") + .get(); + + // log::info!( + // "Creating a new index file {} using {} cores", + // &idx_file, + // &ncore + // ); + + // need to build an index for this bam file + bam::index::build( + bam_file_name, + Some(&idx_file), + bam::index::Type::Bai, + ncore as u32, + )?; + + Ok(idx_file.into_boxed_str()) +}