Skip to content

Commit

Permalink
Fine-tune CRAM container/slice partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Aug 28, 2024
1 parent b0b8cb4 commit ed0fbcb
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 24 deletions.
82 changes: 82 additions & 0 deletions src/cljam/io/cram/encode/partitioning.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
(ns cljam.io.cram.encode.partitioning
(:require [cljam.io.sam.util.header :as sam.header])
(:import [java.util ArrayList List]))

(defn- slice-record-collector
[header {:keys [^long records-per-slice ^long min-single-ref-slice-size]
:or {records-per-slice 10000, min-single-ref-slice-size 1000}}]
(let [sort-by-coord? (= (sam.header/sort-order header) sam.header/order-coordinate)
multi-upper-bound (if sort-by-coord?
min-single-ref-slice-size
records-per-slice)]
(fn [empty-container? alns]
(let [slice-records (ArrayList.)]
(loop [ref-state nil, alns alns]
(if (seq alns)
(let [[{:keys [rname] :as aln} & more] alns
n-records (.size slice-records)]
(case ref-state
nil
(let [ref-state' (if (= rname "*") :unmapped rname)]
(.add slice-records aln)
(recur ref-state' more))

:unmapped
(if (< n-records records-per-slice)
(let [ref-state' (cond (= rname "*") ref-state
sort-by-coord? (throw
(ex-info
(str "Unmapped records "
"must be last")
{}))
:else :multi-ref)]
(.add slice-records aln)
(recur ref-state' more))
[ref-state slice-records alns])

:multi-ref
(if (< n-records multi-upper-bound)
(do (.add slice-records aln)
(recur ref-state more))
[ref-state slice-records alns])

(if (= ref-state rname)
(if (< n-records records-per-slice)
(do (.add slice-records aln)
(recur ref-state more))
[ref-state slice-records alns])
(if (and empty-container? (< n-records min-single-ref-slice-size))
(do (.add slice-records aln)
(recur :multi-ref more))
[ref-state slice-records alns]))))
[ref-state slice-records alns]))))))

(defn with-each-container
"Partition the given alignment records into containers, which are represented
as a List of Lists of record maps, and call the function f with them."
[header options alns f]
(let [{:keys [^long slices-per-container] :or {slices-per-container 1}} options
collect-fn (slice-record-collector header options)
container-records (ArrayList.)]
(loop [ref-state nil, written 0, ready 0, alns alns]
(if (seq alns)
(let [n-slices (.size container-records)
[ref-state' ^List slice-records alns'] (collect-fn (zero? n-slices) alns)]
(if (or (= n-slices slices-per-container)
(= ref-state' :multi-ref)
(and (some? ref-state) (not= ref-state ref-state')))
(let [written' (+ written ready)]
(when-not (.isEmpty container-records)
(f written container-records))
(.clear container-records)
(if (= ref-state' :multi-ref)
(do (f written' (doto (ArrayList.) (.add slice-records)))
(recur nil (+ written' (.size slice-records)) 0 alns'))
(let [ready' (.size slice-records)]
(.add container-records slice-records)
(recur ref-state' written' ready' alns'))))
(let [ready' (+ ready (.size slice-records))]
(.add container-records slice-records)
(recur ref-state' written ready' alns'))))
(when-not (.isEmpty container-records)
(f written container-records))))))
16 changes: 8 additions & 8 deletions src/cljam/io/cram/encode/record.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(ns cljam.io.cram.encode.record
(:require [cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.tag-dict :as tag-dict]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.sam.util.cigar :as sam.cigar]
[cljam.io.sam.util.flag :as sam.flag]
[cljam.io.sam.util.option :as sam.option]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.cram.encode.tag-dict :as tag-dict])
(:import [java.util Arrays]))
[cljam.io.sam.util.option :as sam.option])
(:import [java.util Arrays List]))

(defn- ref-index [rname->idx rname]
(if (= rname "*")
Expand Down Expand Up @@ -202,9 +202,9 @@
"Preprocesses slice records to calculate some record fields prior to record
encoding that are necessary for the CRAM writer to generate some header
components."
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^objects records]
(dotimes [i (alength records)]
(let [record (aget records i)
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^List records]
(dotimes [i (.size records)]
(let [record (.get records i)
;; these flag bits of CF are hard-coded at the moment:
;; - 0x01: quality scores stored as array (true)
;; - 0x02: detached (true)
Expand All @@ -217,4 +217,4 @@
record' (assoc record
::flag cf ::ref-index ri ::end end
::features fs ::tags-index tags-id)]
(aset records i record'))))
(.set records i record'))))
24 changes: 8 additions & 16 deletions src/cljam/io/cram/writer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [cljam.io.crai :as crai]
[cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.partitioning :as partition]
[cljam.io.cram.encode.record :as record]
[cljam.io.cram.encode.structure :as struct]
[cljam.io.cram.seq-resolver.protocol :as resolver]
Expand Down Expand Up @@ -54,9 +55,7 @@
preservation-map
seq-resolver)
{:keys [ds-compressor-overrides tag-compressor-overrides]} options]
(dotimes [i (alength container-records)]
(let [slice-records (aget container-records i)]
(record/preprocess-slice-records container-ctx slice-records)))
(run! (partial record/preprocess-slice-records container-ctx) container-records)
(context/finalize-container-context container-ctx
ds-compressor-overrides
tag-compressor-overrides)))
Expand Down Expand Up @@ -180,8 +179,7 @@
compression-header-block (generate-compression-header-block container-ctx)
container-header (generate-container-header compression-header-block slices)
^DataOutputStream out (.-stream wtr)
container-offset (.size out)
counter' (:counter (peek slices))]
container-offset (.size out)]
(struct/encode-container-header out (assoc container-header :counter counter))
(.write out compression-header-block)
(run! (fn [{:keys [^bytes header-block data-blocks]}]
Expand All @@ -190,17 +188,11 @@
slices)
(when-let [index-writer (.-index-writer wtr)]
(write-index-entries index-writer container-offset container-header
slices container-records))
counter'))

(defn- partition-alignments [slices-per-container records-per-slice alns]
(->> alns
(partition-all records-per-slice)
(map object-array)
(partition-all slices-per-container)
(map object-array)))
slices container-records))))

(defn write-alignments
"Writes all the given alignments, which is a sequence of alignment maps."
[wtr alns header]
(reduce (partial write-container wtr header) 0 (partition-alignments 1 10000 alns)))
[^CRAMWriter wtr alns header]
(partition/with-each-container header (.-options wtr) alns
(fn [counter container-records]
(write-container wtr header counter container-records))))

0 comments on commit ed0fbcb

Please sign in to comment.