Skip to content

Commit

Permalink
Merge pull request #322 from chrovis/feature/cram-slice-partition
Browse files Browse the repository at this point in the history
Fine-grained partition of CRAM containers and slices
  • Loading branch information
ToshimitsuArai authored Aug 30, 2024
2 parents 79205f6 + 7c93d23 commit b70109a
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 29 deletions.
6 changes: 6 additions & 0 deletions src/cljam/io/cram.clj
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
a sequence reader that reads sequences from the reference file.
This may be omitted only when the CRAM file to be read does not require
a reference file.
- records-per-slice: The maximum number of records a slice may contain.
Defaults to 10000.
- slices-per-container: The maximum number of slices a container may contain.
Defaults to 1.
- min-single-ref-slice-size: The minimum number of records required to emit
a single-reference slice. Defaults to 1000.
- ds-compressor-overrides: A function to override data series compressors.
Given a data series keyword, returns a keyword or a set of keywords
representing compression method. It may return another function to add
Expand Down
89 changes: 89 additions & 0 deletions src/cljam/io/cram/encode/partitioning.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
(ns cljam.io.cram.encode.partitioning
(:require [cljam.io.sam.util.header :as sam.header])
(:import [java.util ArrayList List]))

(defn- slice-record-collector
[header {:keys [^long records-per-slice ^long min-single-ref-slice-size]
:or {records-per-slice 10000, min-single-ref-slice-size 1000}}]
(let [sort-by-coord? (= (sam.header/sort-order header) sam.header/order-coordinate)
;; CRAM files sorted by coord can easily get back from a multi-ref state
;; to a single-ref state. To support this, multi-ref slices should be kept
;; as small as possible
multi-upper-bound (if sort-by-coord?
min-single-ref-slice-size
records-per-slice)]
(fn [empty-container? alns]
(let [slice-records (ArrayList.)]
(loop [ref-state nil, alns alns]
(if (seq alns)
(let [[{:keys [rname] :as aln} & more] alns
n-records (.size slice-records)]
(case ref-state
nil
(let [ref-state' (if (= rname "*") :unmapped rname)]
(.add slice-records aln)
(recur ref-state' more))

:unmapped
(if (< n-records records-per-slice)
(let [ref-state' (cond (= rname "*") ref-state
sort-by-coord? (throw
(ex-info
(str "Unmapped records "
"must be last")
{}))
:else :multi-ref)]
(.add slice-records aln)
(recur ref-state' more))
[ref-state slice-records alns])

:multi-ref
(if (< n-records multi-upper-bound)
(do (.add slice-records aln)
(recur ref-state more))
[ref-state slice-records alns])

(if (= ref-state rname)
(if (< n-records records-per-slice)
(do (.add slice-records aln)
(recur ref-state more))
[ref-state slice-records alns])
;; If the container already contains one or more single-ref
;; slices, instead of creating and adding a new multi-ref slice
;; to that container, add the accumulated single-ref slice
;; to the container and add the current record to the next slice
(if (and empty-container? (< n-records min-single-ref-slice-size))
(do (.add slice-records aln)
(recur :multi-ref more))
[ref-state slice-records alns]))))
[ref-state slice-records alns]))))))

(defn with-each-container
"Partitions the given alignment records into containers, which are represented
as a List of Lists of record maps, and calls the function f with them."
[header options alns f]
(let [{:keys [^long slices-per-container] :or {slices-per-container 1}} options
collect-fn (slice-record-collector header options)
container-records (ArrayList.)]
(loop [ref-state nil, written 0, ready 0, alns alns]
(if (seq alns)
(let [n-slices (.size container-records)
[ref-state' ^List slice-records alns'] (collect-fn (zero? n-slices) alns)]
(if (or (= n-slices slices-per-container)
(= ref-state' :multi-ref)
(and (some? ref-state) (not= ref-state ref-state')))
(let [written' (+ written ready)]
(when-not (.isEmpty container-records)
(f written container-records))
(.clear container-records)
(if (= ref-state' :multi-ref)
(do (f written' (doto (ArrayList.) (.add slice-records)))
(recur nil (+ written' (.size slice-records)) 0 alns'))
(let [ready' (.size slice-records)]
(.add container-records slice-records)
(recur ref-state' written' ready' alns'))))
(let [ready' (+ ready (.size slice-records))]
(.add container-records slice-records)
(recur ref-state' written ready' alns'))))
(when-not (.isEmpty container-records)
(f written container-records))))))
16 changes: 8 additions & 8 deletions src/cljam/io/cram/encode/record.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(ns cljam.io.cram.encode.record
(:require [cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.tag-dict :as tag-dict]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.sam.util.cigar :as sam.cigar]
[cljam.io.sam.util.flag :as sam.flag]
[cljam.io.sam.util.option :as sam.option]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.cram.encode.tag-dict :as tag-dict])
(:import [java.util Arrays]))
[cljam.io.sam.util.option :as sam.option])
(:import [java.util Arrays List]))

(defn- ref-index [rname->idx rname]
(if (= rname "*")
Expand Down Expand Up @@ -202,9 +202,9 @@
"Preprocesses slice records to calculate some record fields prior to record
encoding that are necessary for the CRAM writer to generate some header
components."
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^objects records]
(dotimes [i (alength records)]
(let [record (aget records i)
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^List records]
(dotimes [i (.size records)]
(let [record (.get records i)
;; these flag bits of CF are hard-coded at the moment:
;; - 0x01: quality scores stored as array (true)
;; - 0x02: detached (true)
Expand All @@ -217,4 +217,4 @@
record' (assoc record
::flag cf ::ref-index ri ::end end
::features fs ::tags-index tags-id)]
(aset records i record'))))
(.set records i record'))))
24 changes: 8 additions & 16 deletions src/cljam/io/cram/writer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [cljam.io.crai :as crai]
[cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.partitioning :as partition]
[cljam.io.cram.encode.record :as record]
[cljam.io.cram.encode.structure :as struct]
[cljam.io.cram.seq-resolver.protocol :as resolver]
Expand Down Expand Up @@ -54,9 +55,7 @@
preservation-map
seq-resolver)
{:keys [ds-compressor-overrides tag-compressor-overrides]} options]
(dotimes [i (alength container-records)]
(let [slice-records (aget container-records i)]
(record/preprocess-slice-records container-ctx slice-records)))
(run! (partial record/preprocess-slice-records container-ctx) container-records)
(context/finalize-container-context container-ctx
ds-compressor-overrides
tag-compressor-overrides)))
Expand Down Expand Up @@ -180,8 +179,7 @@
compression-header-block (generate-compression-header-block container-ctx)
container-header (generate-container-header compression-header-block slices)
^DataOutputStream out (.-stream wtr)
container-offset (.size out)
counter' (:counter (peek slices))]
container-offset (.size out)]
(struct/encode-container-header out (assoc container-header :counter counter))
(.write out compression-header-block)
(run! (fn [{:keys [^bytes header-block data-blocks]}]
Expand All @@ -190,17 +188,11 @@
slices)
(when-let [index-writer (.-index-writer wtr)]
(write-index-entries index-writer container-offset container-header
slices container-records))
counter'))

(defn- partition-alignments [slices-per-container records-per-slice alns]
(->> alns
(partition-all records-per-slice)
(map object-array)
(partition-all slices-per-container)
(map object-array)))
slices container-records))))

(defn write-alignments
"Writes all the given alignments, which is a sequence of alignment maps."
[wtr alns header]
(reduce (partial write-container wtr header) 0 (partition-alignments 1 10000 alns)))
[^CRAMWriter wtr alns header]
(partition/with-each-container header (.-options wtr) alns
(fn [counter container-records]
(write-container wtr header counter container-records))))
201 changes: 201 additions & 0 deletions test/cljam/io/cram/encode/partitioning_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
(ns cljam.io.cram.encode.partitioning-test
(:require [cljam.io.cram.encode.partitioning :as partition]
[clojure.test :refer [are deftest testing]]))

(defn- partition-alignments [header options alns]
(let [acc (volatile! [])]
(partition/with-each-container header options alns
(fn [counter container-records]
(vswap! acc conj [counter (mapv vec container-records)])))
@acc))

(deftest with-each-container-test
(let [options {:slices-per-container 2
:records-per-slice 3
:min-single-ref-slice-size 2}]
(testing "sorted by coord"
(let [header {:HD {:SO "coordinate"}}]
(are [input expected]
(= expected
(partition-alignments header options input))
[]
[]

[{:qname "q1", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}]]]]

(map (fn [i] {:qname (str \q (inc (long i))), :rname "chr1"}) (range 6))
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}
{:qname "q5", :rname "chr1"}
{:qname "q6", :rname "chr1"}]]]]

(map (fn [i] {:qname (str \q (inc (long i))), :rname "chr1"}) (range 7))
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}
{:qname "q5", :rname "chr1"}
{:qname "q6", :rname "chr1"}]]]
[6 [[{:qname "q7", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]]]
[3 [[{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]]]
[2 [[{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr1"}
{:qname "q5", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "chr1"}]
[{:qname "q4", :rname "chr1"}]]]
[4 [[{:qname "q5", :rname "chr2"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr3"}
{:qname "q5", :rname "chr4"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}]]]
[2 [[{:qname "q3", :rname "chr2"}
{:qname "q4", :rname "chr3"}]]]
[4 [[{:qname "q5", :rname "chr4"}]]]]

[{:qname "q1", :rname "*"}]
[[0 [[{:qname "q1", :rname "*"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "*"}]
[{:qname "q4", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}
{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr1"}]]]
[2 [[{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "*"}]]]
[2 [[{:qname "q3", :rname "*"}
{:qname "q4", :rname "*"}]]]])
(are [input] (thrown? Exception (partition-alignments header options input))
[{:qname "q1", :rname "*"}
{:qname "q2", :rname "chr1"}]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "chr1"}])))
(testing "unsorted"
(are [input expected]
(= expected (partition-alignments {:HD {:SO "unsorted"}} options input))
[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}]]]]

[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}
{:qname "q4", :rname "chr2"}]
[[0 [[{:qname "q1", :rname "chr1"}
{:qname "q2", :rname "chr2"}
{:qname "q3", :rname "chr1"}]]]
[3 [[{:qname "q4", :rname "chr2"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "chr1"}]]]]

[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "chr1"}]
[[0 [[{:qname "q1", :rname "*"}
{:qname "q2", :rname "*"}
{:qname "q3", :rname "chr1"}]]]]))))
Loading

0 comments on commit b70109a

Please sign in to comment.