diff --git a/src/cljam/io/cram/encode/partitioning.clj b/src/cljam/io/cram/encode/partitioning.clj new file mode 100644 index 00000000..2390a6f1 --- /dev/null +++ b/src/cljam/io/cram/encode/partitioning.clj @@ -0,0 +1,82 @@ +(ns cljam.io.cram.encode.partitioning + (:require [cljam.io.sam.util.header :as sam.header]) + (:import [java.util ArrayList List])) + +(defn- slice-record-collector + [header {:keys [^long records-per-slice ^long min-single-ref-slice-size] + :or {records-per-slice 10000, min-single-ref-slice-size 1000}}] + (let [sort-by-coord? (= (sam.header/sort-order header) sam.header/order-coordinate) + multi-upper-bound (if sort-by-coord? + min-single-ref-slice-size + records-per-slice)] + (fn [empty-container? alns] + (let [slice-records (ArrayList.)] + (loop [ref-state nil, alns alns] + (if (seq alns) + (let [[{:keys [rname] :as aln} & more] alns + n-records (.size slice-records)] + (case ref-state + nil + (let [ref-state' (if (= rname "*") :unmapped rname)] + (.add slice-records aln) + (recur ref-state' more)) + + :unmapped + (if (< n-records records-per-slice) + (let [ref-state' (cond (= rname "*") ref-state + sort-by-coord? (throw + (ex-info + (str "Unmapped records " + "must be last") + {})) + :else :multi-ref)] + (.add slice-records aln) + (recur ref-state' more)) + [ref-state slice-records alns]) + + :multi-ref + (if (< n-records multi-upper-bound) + (do (.add slice-records aln) + (recur ref-state more)) + [ref-state slice-records alns]) + + (if (= ref-state rname) + (if (< n-records records-per-slice) + (do (.add slice-records aln) + (recur ref-state more)) + [ref-state slice-records alns]) + (if (and empty-container? (< n-records min-single-ref-slice-size)) + (do (.add slice-records aln) + (recur :multi-ref more)) + [ref-state slice-records alns])))) + [ref-state slice-records alns])))))) + +(defn with-each-container + "Partition the given alignment records into containers, which are represented + as a List of Lists of record maps, and call the function f with them." + [header options alns f] + (let [{:keys [^long slices-per-container] :or {slices-per-container 1}} options + collect-fn (slice-record-collector header options) + container-records (ArrayList.)] + (loop [ref-state nil, written 0, ready 0, alns alns] + (if (seq alns) + (let [n-slices (.size container-records) + [ref-state' ^List slice-records alns'] (collect-fn (zero? n-slices) alns)] + (if (or (= n-slices slices-per-container) + (= ref-state' :multi-ref) + (and (some? ref-state) (not= ref-state ref-state'))) + (let [written' (+ written ready)] + (when-not (.isEmpty container-records) + (f written container-records)) + (.clear container-records) + (if (= ref-state' :multi-ref) + (do (f written' (doto (ArrayList.) (.add slice-records))) + (recur nil (+ written' (.size slice-records)) 0 alns')) + (let [ready' (.size slice-records)] + (.add container-records slice-records) + (recur ref-state' written' ready' alns')))) + (let [ready' (+ ready (.size slice-records))] + (.add container-records slice-records) + (recur ref-state' written ready' alns')))) + (when-not (.isEmpty container-records) + (f written container-records)))))) diff --git a/src/cljam/io/cram/encode/record.clj b/src/cljam/io/cram/encode/record.clj index 64685d40..bcd10e4a 100644 --- a/src/cljam/io/cram/encode/record.clj +++ b/src/cljam/io/cram/encode/record.clj @@ -1,11 +1,11 @@ (ns cljam.io.cram.encode.record (:require [cljam.io.cram.encode.alignment-stats :as stats] + [cljam.io.cram.encode.tag-dict :as tag-dict] + [cljam.io.cram.seq-resolver.protocol :as resolver] [cljam.io.sam.util.cigar :as sam.cigar] [cljam.io.sam.util.flag :as sam.flag] - [cljam.io.sam.util.option :as sam.option] - [cljam.io.cram.seq-resolver.protocol :as resolver] - [cljam.io.cram.encode.tag-dict :as tag-dict]) - (:import [java.util Arrays])) + [cljam.io.sam.util.option :as sam.option]) + (:import [java.util Arrays List])) (defn- ref-index [rname->idx rname] (if (= rname "*") @@ -202,9 +202,9 @@ "Preprocesses slice records to calculate some record fields prior to record encoding that are necessary for the CRAM writer to generate some header components." - [{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^objects records] - (dotimes [i (alength records)] - (let [record (aget records i) + [{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^List records] + (dotimes [i (.size records)] + (let [record (.get records i) ;; these flag bits of CF are hard-coded at the moment: ;; - 0x01: quality scores stored as array (true) ;; - 0x02: detached (true) @@ -217,4 +217,4 @@ record' (assoc record ::flag cf ::ref-index ri ::end end ::features fs ::tags-index tags-id)] - (aset records i record')))) + (.set records i record')))) diff --git a/src/cljam/io/cram/writer.clj b/src/cljam/io/cram/writer.clj index 02aafc4c..66b2dd92 100644 --- a/src/cljam/io/cram/writer.clj +++ b/src/cljam/io/cram/writer.clj @@ -2,6 +2,7 @@ (:require [cljam.io.crai :as crai] [cljam.io.cram.encode.alignment-stats :as stats] [cljam.io.cram.encode.context :as context] + [cljam.io.cram.encode.partitioning :as partition] [cljam.io.cram.encode.record :as record] [cljam.io.cram.encode.structure :as struct] [cljam.io.cram.seq-resolver.protocol :as resolver] @@ -54,9 +55,7 @@ preservation-map seq-resolver) {:keys [ds-compressor-overrides tag-compressor-overrides]} options] - (dotimes [i (alength container-records)] - (let [slice-records (aget container-records i)] - (record/preprocess-slice-records container-ctx slice-records))) + (run! (partial record/preprocess-slice-records container-ctx) container-records) (context/finalize-container-context container-ctx ds-compressor-overrides tag-compressor-overrides))) @@ -180,8 +179,7 @@ compression-header-block (generate-compression-header-block container-ctx) container-header (generate-container-header compression-header-block slices) ^DataOutputStream out (.-stream wtr) - container-offset (.size out) - counter' (:counter (peek slices))] + container-offset (.size out)] (struct/encode-container-header out (assoc container-header :counter counter)) (.write out compression-header-block) (run! (fn [{:keys [^bytes header-block data-blocks]}] @@ -190,17 +188,11 @@ slices) (when-let [index-writer (.-index-writer wtr)] (write-index-entries index-writer container-offset container-header - slices container-records)) - counter')) - -(defn- partition-alignments [slices-per-container records-per-slice alns] - (->> alns - (partition-all records-per-slice) - (map object-array) - (partition-all slices-per-container) - (map object-array))) + slices container-records)))) (defn write-alignments "Writes all the given alignments, which is a sequence of alignment maps." - [wtr alns header] - (reduce (partial write-container wtr header) 0 (partition-alignments 1 10000 alns))) + [^CRAMWriter wtr alns header] + (partition/with-each-container header (.-options wtr) alns + (fn [counter container-records] + (write-container wtr header counter container-records))))