diff --git a/Cargo.lock b/Cargo.lock index 86f89e7..364a01a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1882,7 +1882,7 @@ checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" [[package]] name = "seqair" version = "0.1.0" -source = "git+https://tangled.org/softleif.se/seqair.git?rev=28766a811596437f685f76aac753f963cde96f7f#28766a811596437f685f76aac753f963cde96f7f" +source = "git+https://github.com/sstadick/seqair?rev=6d9251ccc7681c9ed6b26be24ae61cf59c0a8de9#6d9251ccc7681c9ed6b26be24ae61cf59c0a8de9" dependencies = [ "bytemuck", "bzip2", @@ -1902,7 +1902,7 @@ dependencies = [ [[package]] name = "seqair-types" version = "0.1.0" -source = "git+https://tangled.org/softleif.se/seqair.git?rev=28766a811596437f685f76aac753f963cde96f7f#28766a811596437f685f76aac753f963cde96f7f" +source = "git+https://github.com/sstadick/seqair?rev=6d9251ccc7681c9ed6b26be24ae61cf59c0a8de9#6d9251ccc7681c9ed6b26be24ae61cf59c0a8de9" dependencies = [ "bytemuck", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4861906..54982c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,8 @@ lru_time_cache = "0.11.1" num_cpus = "1.13.0" rayon = "1.4.0" rust-htslib = {version = "0.51", default-features = false, features = ["libdeflate"]} -seqair = { git = "https://tangled.org/softleif.se/seqair.git", rev = "28766a811596437f685f76aac753f963cde96f7f", optional = true } -seqair-types = { git = "https://tangled.org/softleif.se/seqair.git", rev = "28766a811596437f685f76aac753f963cde96f7f", optional = true } +seqair = { git = "https://github.com/sstadick/seqair", rev = "6d9251ccc7681c9ed6b26be24ae61cf59c0a8de9", optional = true } +seqair-types = { git = "https://github.com/sstadick/seqair", rev = "6d9251ccc7681c9ed6b26be24ae61cf59c0a8de9", optional = true } rust-lapper = "1.0" serde = { version = "1.0.116", features = ["derive"] } smartstring = { version = "1.0.1", features = ["serde"] } diff --git a/src/commands/base_depth.rs b/src/commands/base_depth.rs index 37a9ea9..009dd27 100644 --- a/src/commands/base_depth.rs +++ b/src/commands/base_depth.rs @@ -6,6 +6,8 @@ use anyhow::Result; use bio::io::fasta::IndexedReader; use log::*; +#[cfg(feature = "seqair-pileup")] +use perbase_lib::position::pileup_position::SeqairPileupPositionAccumulator; use perbase_lib::{ par_granges::{self, RegionProcessor}, position::{Position, mate_fix::MateResolutionStrategy, pileup_position::PileupPosition}, @@ -404,27 +406,30 @@ impl BaseProcessor { engine.set_max_depth(self.max_depth); let mut result = Vec::new(); - while let Some(column) = engine.pileups() { - let pileup_depth = u32::try_from(column.depth()).unwrap_or(u32::MAX); - let mut pos = if self.mate_fix { - PileupPosition::from_seqair_column_mate_aware( + if self.mate_fix { + while let Some(column) = engine.pileups() { + let pileup_depth = u32::try_from(column.depth()).unwrap_or(u32::MAX); + let mut pos = PileupPosition::from_seqair_column_mate_aware( ref_name.clone(), &column, &self.read_filter, self.min_base_quality_score, self.mate_fix_strategy, - ) - } else { - PileupPosition::from_seqair_column( - ref_name.clone(), - &column, - &self.read_filter, - self.min_base_quality_score, - ) - }; + ); - self.finalize_pileup_position(&mut pos, pileup_depth); - result.push(pos); + self.finalize_pileup_position(&mut pos, pileup_depth); + result.push(pos); + } + } else { + let mut accumulator = SeqairPileupPositionAccumulator::new( + ref_name.clone(), + &self.read_filter, + self.min_base_quality_score, + ); + while let Some(mut accumulated) = engine.pileup_with(&mut accumulator) { + self.finalize_pileup_position(&mut accumulated.position, accumulated.pileup_depth); + result.push(accumulated.position); + } } self.fill_zero_positions(result, ref_name, start, stop) diff --git a/src/commands/only_depth.rs b/src/commands/only_depth.rs index 11ad8b0..46e3498 100644 --- a/src/commands/only_depth.rs +++ b/src/commands/only_depth.rs @@ -93,7 +93,7 @@ pub struct OnlyDepth { #[structopt(long, short = "x")] fast_mode: bool, - /// Use the experimental seqair-backed reader with raw pre-filtering. Requires unmapped reads to be excluded (e.g. -F 4, or -F 3852 instead of -F 3848). + /// Use the experimental seqair-backed reader with raw pre-filtering. #[cfg(feature = "seqair-pileup")] #[structopt(long = "seqair")] seqair: bool, @@ -197,12 +197,6 @@ impl OnlyDepth { "The experimental seqair only-depth path currently supports BAM input only" ); } - #[cfg(feature = "seqair-pileup")] - if self.seqair && self.exclude_flags & 0x4 == 0 { - anyhow::bail!( - "The experimental seqair only-depth path requires unmapped reads to be excluded; add -F 4, or use -F 3852 if you would otherwise use -F 3848" - ); - } let cpus = utils::determine_allowed_cpus(self.threads)?; let mut writer = utils::get_writer( @@ -347,7 +341,16 @@ impl OnlyDepthProcessor { fn with_seqair(mut self, seqair: bool) -> Result { self.seqair = seqair; if seqair { - self.seqair_reader_template = Some(seqair::reader::IndexedReader::open(&self.reads)?); + let reader = match seqair::reader::IndexedReader::open(&self.reads)? { + // `only-depth` consumes raw fetched records (like htslib `view`), not seqair's + // pileup stream. Keep placed-unmapped BAM records in the fetch stream and let + // perbase's normal read-filter semantics decide whether `-F 4` excludes them. + seqair::reader::IndexedReader::Bam(reader) => { + seqair::reader::IndexedReader::Bam(reader.keep_unmapped(true)) + } + reader => reader, + }; + self.seqair_reader_template = Some(reader); } Ok(self) } @@ -689,6 +692,24 @@ impl OnlyDepthProcessor { } } + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn seqair_fast_reference_stop( + rec: &seqair::bam::record_store::SlimRecord, + cigar: &[seqair::bam::CigarOp], + ) -> u32 { + // Match rust-htslib's `Record::reference_end()` behavior for placed-unmapped + // records: even if a CIGAR is present, fast-mode treats the record as a + // one-base placed interval. Normal `only-depth` still walks CIGAR blocks. + if rec.flags.is_unmapped() { + (*rec.pos) + .checked_add(1) + .expect("seqair unmapped end position overflow") + } else { + Self::seqair_reference_stop(rec, cigar) + } + } + #[cfg(feature = "seqair-pileup")] fn merge_seqair_mate_intervals( counter: &mut [i32], @@ -716,11 +737,9 @@ impl OnlyDepthProcessor { ) -> Vec { let mut counter: Vec = vec![0; (stop - start) as usize]; - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); + for rec in store.records() { let mut ref_pos = *rec.pos; - for op in store.cigar(idx) { + for op in rec.cigar(&store).expect("seqair CIGAR") { let len = op.len(); match op.op_type() { seqair::bam::cigar::CigarOpType::Match @@ -773,14 +792,13 @@ impl OnlyDepthProcessor { let mut counter: Vec = vec![0; (stop - start) as usize]; let mut maties: HashMap>> = HashMap::new(); - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); - let cigar = store.cigar(idx); + for rec in store.records() { + let cigar = rec.cigar(&store).expect("seqair CIGAR"); let reference_stop = Self::seqair_reference_stop(rec, cigar); let maybe_mate_key = if OnlyDepth::maybe_overlaps_mate_seqair(rec, reference_stop) { Some(String::from( - std::str::from_utf8(store.qname(idx)).expect("Convert qname"), + std::str::from_utf8(rec.qname(&store).expect("seqair qname")) + .expect("Convert qname"), )) } else { None @@ -848,10 +866,9 @@ impl OnlyDepthProcessor { ) -> Vec { let mut counter: Vec = vec![0; (stop - start) as usize]; - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); - let reference_stop = Self::seqair_reference_stop(rec, store.cigar(idx)); + for rec in store.records() { + let reference_stop = + Self::seqair_fast_reference_stop(rec, rec.cigar(&store).expect("seqair CIGAR")); if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = Self::adjusted_seqair_interval(counter.len(), *rec.pos, reference_stop, start, stop) { @@ -878,16 +895,17 @@ impl OnlyDepthProcessor { let mut counter: Vec = vec![0; (stop - start) as usize]; let mut maties: HashMap>> = HashMap::new(); - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); - let reference_stop = Self::seqair_reference_stop(rec, store.cigar(idx)); + for rec in store.records() { + let reference_stop = + Self::seqair_fast_reference_stop(rec, rec.cigar(&store).expect("seqair CIGAR")); if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = Self::adjusted_seqair_interval(counter.len(), *rec.pos, reference_stop, start, stop) { if OnlyDepth::maybe_overlaps_mate_seqair(rec, reference_stop) { - let qname = - String::from(std::str::from_utf8(store.qname(idx)).expect("Convert qname")); + let qname = String::from( + std::str::from_utf8(rec.qname(&store).expect("seqair qname")) + .expect("Convert qname"), + ); maties.entry(qname).or_default().push(Interval { start: adjusted_start, stop: adjusted_stop, @@ -1385,6 +1403,60 @@ mod tests { assert_eq!(htslib_positions, seqair_positions); } + #[cfg(feature = "seqair-pileup")] + #[rstest(fast_mode => [false, true])] + fn seqair_only_depth_keeps_placed_unmapped_like_htslib_records(fast_mode: bool) { + let tempdir = tempdir().unwrap(); + let bam_path = tempdir.path().join("placed_unmapped.bam"); + + let mut header = bam::header::Header::new(); + let mut chr1 = bam::header::HeaderRecord::new(b"SQ"); + chr1.push_tag(b"SN", &"chr1".to_owned()); + chr1.push_tag(b"LN", &"100".to_owned()); + header.push_record(&chr1); + let view = bam::HeaderView::from_header(&header); + + let records = [ + Record::from_sam( + &view, + b"MAPPED\t0\tchr1\t1\t40\t10M\t*\t0\t0\tAAAAAAAAAA\tIIIIIIIIII", + ) + .unwrap(), + // Placed-unmapped records can be returned by htslib's raw record iterator after + // region fetch. seqair 0.1.0's `keep_unmapped(true)` lets perbase apply the same + // user read filters instead of dropping them before `filter_raw`. + Record::from_sam( + &view, + b"PLACED_UNMAPPED\t4\tchr1\t5\t0\t10M\t*\t0\t0\tCCCCCCCCCC\tIIIIIIIIII", + ) + .unwrap(), + ]; + + let mut writer = bam::Writer::from_path(&bam_path, &header, bam::Format::Bam).unwrap(); + for record in &records { + writer.write(record).unwrap(); + } + drop(writer); + bam::index::build(&bam_path, None, bam::index::Type::Bai, 1).unwrap(); + + let htslib_positions = collect_only_depth_tuples( + bam_path.clone(), + fast_mode, + false, + false, + DefaultReadFilter::new(0, 0, 0), + ); + let seqair_positions = collect_only_depth_tuples( + bam_path, + fast_mode, + false, + true, + DefaultReadFilter::new(0, 0, 0), + ); + + assert_eq!(htslib_positions, seqair_positions); + } + #[rstest( positions, awareness_modifier, diff --git a/src/lib/position/pileup_position.rs b/src/lib/position/pileup_position.rs index e7fcae6..ecce5d6 100644 --- a/src/lib/position/pileup_position.rs +++ b/src/lib/position/pileup_position.rs @@ -442,6 +442,70 @@ impl PileupPosition { } } +/// A position produced by seqair's custom pileup accumulator, plus raw pileup depth. +#[cfg(feature = "seqair-pileup")] +pub struct SeqairAccumulatedPosition { + /// Perbase output position after read/base filtering has been applied. + pub position: PileupPosition, + /// Raw emitted pileup depth before perbase read/refskip filtering. + pub pileup_depth: u32, +} + +/// Accumulate non-mate-aware perbase counts directly from seqair pileup observations. +#[cfg(feature = "seqair-pileup")] +pub struct SeqairPileupPositionAccumulator<'a, F> { + ref_seq: String, + read_filter: &'a F, + base_filter: Option, + position: Option, +} + +#[cfg(feature = "seqair-pileup")] +impl<'a, F> SeqairPileupPositionAccumulator<'a, F> { + /// Create an accumulator for one reference sequence. + pub fn new(ref_seq: String, read_filter: &'a F, base_filter: Option) -> Self { + Self { + ref_seq, + read_filter, + base_filter, + position: None, + } + } +} + +#[cfg(feature = "seqair-pileup")] +impl seqair::bam::pileup::PileupColumnAccumulator + for SeqairPileupPositionAccumulator<'_, F> +{ + type Output = SeqairAccumulatedPosition; + + fn begin_column(&mut self, pos: seqair::bam::Pos0, _reference_base: seqair_types::Base) { + self.position = Some(PileupPosition::new(self.ref_seq.clone(), *pos)); + } + + fn observe_alignment( + &mut self, + alignment: seqair::bam::pileup::PileupAlignment, + _store: &seqair::bam::RecordStore, + ) { + let Some(position) = self.position.as_mut() else { + return; + }; + position.depth = position.depth.saturating_add(1); + position.update_seqair_raw(&alignment, self.read_filter, self.base_filter); + } + + fn finish_column( + &mut self, + context: seqair::bam::pileup::PileupColumnContext<'_, U>, + ) -> Option { + Some(SeqairAccumulatedPosition { + position: self.position.take()?, + pileup_depth: u32::try_from(context.depth()).unwrap_or(u32::MAX), + }) + } +} + #[cfg(test)] mod tests { use super::*; @@ -995,13 +1059,122 @@ mod tests { } } - /// TODO: enable once seqair yields pileup observations for empty SEQ (`*`) reads. - /// Current htslib behavior, covered above, counts those reads toward depth as `N`. - /// seqair currently appears to drop them because no base/quality exists at qpos. + /// seqair 0.1.0 yields `Base::Unknown` / unavailable quality for empty-SEQ (`*`) + /// pileup observations, matching htslib/perbase's depth-as-N behavior. #[cfg(feature = "seqair-pileup")] #[test] - #[ignore = "requires upstream seqair empty-SEQ pileup support"] fn seqair_empty_seq_matches_htslib_after_upstream_fix() { - todo!("construct the empty-SEQ fixture above with seqair and compare against htslib output") + use rust_htslib::bam::{IndexedReader, Writer, index}; + use std::path::Path; + use tempfile::tempdir; + + fn htslib_position( + bam_path: &Path, + read_filter: &DefaultReadFilter, + base_filter: Option, + ) -> PileupPosition { + let mut reader = IndexedReader::from_path(bam_path).unwrap(); + let header_view = reader.header().clone(); + reader.fetch(("chr1", 0, 1)).unwrap(); + for pileup_result in reader.pileup() { + let pileup = pileup_result.unwrap(); + if pileup.pos() == 0 { + return PileupPosition::from_pileup( + pileup, + &header_view, + read_filter, + base_filter, + ); + } + } + panic!("htslib pileup did not yield chr1:1"); + } + + fn seqair_position( + bam_path: &Path, + read_filter: &DefaultReadFilter, + base_filter: Option, + ) -> PileupPosition { + let mut reader = seqair::reader::IndexedReader::open(bam_path).unwrap(); + let tid = reader.header().tid("chr1").unwrap(); + let start = seqair::bam::Pos0::new(0).unwrap(); + let end = seqair::bam::Pos0::new(0).unwrap(); + let mut store = seqair::bam::RecordStore::new(); + reader.fetch_into(tid, start, end, &mut store).unwrap(); + let mut engine = seqair::bam::pileup::PileupEngine::new(store, start, end); + while let Some(column) = engine.pileups() { + if *column.pos() == 0 { + return PileupPosition::from_seqair_column( + String::from("chr1"), + &column, + read_filter, + base_filter, + ); + } + } + panic!("seqair pileup did not yield chr1:1"); + } + + fn seqair_accumulated_position( + bam_path: &Path, + read_filter: &DefaultReadFilter, + base_filter: Option, + ) -> PileupPosition { + let mut reader = seqair::reader::IndexedReader::open(bam_path).unwrap(); + let tid = reader.header().tid("chr1").unwrap(); + let start = seqair::bam::Pos0::new(0).unwrap(); + let end = seqair::bam::Pos0::new(0).unwrap(); + let mut store = seqair::bam::RecordStore::new(); + reader.fetch_into(tid, start, end, &mut store).unwrap(); + let mut engine = seqair::bam::pileup::PileupEngine::new(store, start, end); + let mut accumulator = SeqairPileupPositionAccumulator::new( + String::from("chr1"), + read_filter, + base_filter, + ); + while let Some(accumulated) = engine.pileup_with(&mut accumulator) { + if accumulated.position.pos == 0 { + return accumulated.position; + } + } + panic!("seqair accumulated pileup did not yield chr1:1"); + } + + let tempdir = tempdir().unwrap(); + let bam_path = tempdir.path().join("empty_seq_seqair.bam"); + + let mut header = bam::header::Header::new(); + let mut chr1 = bam::header::HeaderRecord::new(b"SQ"); + chr1.push_tag(b"SN", &"chr1".to_owned()); + chr1.push_tag(b"LN", &"100".to_owned()); + header.push_record(&chr1); + let view = bam::HeaderView::from_header(&header); + + let normal_record = Record::from_sam( + &view, + b"NORMAL\t0\tchr1\t1\t40\t25M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAA\tIIIIIIIIIIIIIIIIIIIIIIIII", + ) + .unwrap(); + let empty_seq_record = + Record::from_sam(&view, b"EMPTY_SEQ\t0\tchr1\t1\t40\t25M\t*\t0\t0\t*\t*").unwrap(); + + let mut writer = Writer::from_path(&bam_path, &header, bam::Format::Bam).unwrap(); + writer.write(&normal_record).unwrap(); + writer.write(&empty_seq_record).unwrap(); + drop(writer); + index::build(&bam_path, None, index::Type::Bai, 1).unwrap(); + + let read_filter = DefaultReadFilter::new(0, 0, 0); + for base_filter in [None, Some(20)] { + let htslib = htslib_position(&bam_path, &read_filter, base_filter); + let seqair = seqair_position(&bam_path, &read_filter, base_filter); + let seqair_accumulated = + seqair_accumulated_position(&bam_path, &read_filter, base_filter); + assert_eq!(htslib, seqair); + assert_eq!(htslib, seqair_accumulated); + assert_eq!(seqair_accumulated.depth, 2); + assert_eq!(seqair_accumulated.a, 1); + assert_eq!(seqair_accumulated.n, 1); + } } }