Skip to content

Commit

Permalink
wip: Put various encoding-related states together into context
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Jul 30, 2024
1 parent a5a3bbb commit 265b14f
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 100 deletions.
51 changes: 51 additions & 0 deletions src/cljam/io/cram/encode/context.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
(ns cljam.io.cram.encode.context
(:require [cljam.io.cram.data-series :as ds]
[cljam.io.cram.encode.tag-dict :as tag-dict]))

(defn make-container-context
"TODO"
[cram-header preservation-map seq-resolver]
(let [rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
subst-mat {\A {\T 0, \G 1, \C 2, \N 3}
\T {\A 0, \G 1, \C 2, \N 3}
\G {\A 0, \T 1, \C 2, \N 3}
\C {\A 0, \T 1, \G 2, \N 3}
\N {\A 0, \T 1, \G 2, \C 3}}
tag-dict-builder (tag-dict/make-tag-dict-builder)]
{:cram-header cram-header
:rname->idx rname->idx
:preservation-map preservation-map
:subst-mat subst-mat
:seq-resolver seq-resolver
:tag-dict-builder tag-dict-builder}))

(defn finalize-container-context
"TODO"
[{:keys [tag-dict-builder] :as container-ctx}]
(let [tag-dict (tag-dict/build-tag-dict tag-dict-builder)
tag-encodings (tag-dict/build-tag-encodings tag-dict)]
(assoc container-ctx
:tag-dict tag-dict
:ds-encodings ds/default-data-series-encodings
:tag-encodings tag-encodings)))

(defn make-slice-context
"TODO"
[{:keys [ds-encodings tag-encodings] :as container-ctx}]
(let [ds-encoders (ds/build-data-series-encoders ds-encodings)
tag-encoders (ds/build-tag-encoders tag-encodings)]
(assoc container-ctx
:ds-encoders ds-encoders
:tag-encoders tag-encoders)))

(defn encoding-results
"TODO"
[{:keys [ds-encoders tag-encoders]}]
(let [ds-results (mapcat #(%) (vals ds-encoders))
tag-results (for [[_tag v] tag-encoders
[_type encoder] v
res (encoder)]
res)]
(concat ds-results tag-results)))
28 changes: 14 additions & 14 deletions src/cljam/io/cram/encode/record.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-1
(get rname->idx rname)))

(defn- build-positional-data-encoder [cram-header {:keys [RI RL AP RG]}]
(defn- build-positional-data-encoder [{:keys [cram-header]} {:keys [RI RL AP RG]}]
(let [rg-id->idx (into {}
(map-indexed (fn [i {:keys [ID]}] [ID i]))
(:RG cram-header))]
Expand All @@ -27,7 +27,7 @@
(fn [record]
(RN (.getBytes ^String (:qname record)))))

(defn- build-mate-read-encoder [rname->idx {:keys [MF NS NP TS]}]
(defn- build-mate-read-encoder [{:keys [rname->idx]} {:keys [MF NS NP TS]}]
(fn [{:keys [^long flag rnext] :as record}]
(let [mate-flag (cond-> 0
(pos? (bit-and flag (sam.flag/encoded #{:next-reversed})))
Expand All @@ -42,7 +42,7 @@
(NP (:pnext record))
(TS (:tlen record)))))

(defn- build-auxiliary-tags-encoder [tag-dict {:keys [TL]} tag-encoders]
(defn- build-auxiliary-tags-encoder [{:keys [tag-dict tag-encoders]} {:keys [TL]}]
(let [tag-encoder (fn [{:keys [tag] :as item}]
(let [tag&type [tag (:type item)]
encoder (get-in tag-encoders tag&type)]
Expand Down Expand Up @@ -114,13 +114,14 @@
(encode-qual record QS))))

(defn- build-cram-record-encoder
[cram-header rname->idx tag-dict {:keys [BF CF] :as ds-encoders} tag-encoders stats-builder]
(let [pos-encoder (build-positional-data-encoder cram-header ds-encoders)
[{:keys [ds-encoders] :as ctx} stats-builder]
(let [pos-encoder (build-positional-data-encoder ctx ds-encoders)
name-encoder (build-read-name-encoder ds-encoders)
mate-encoder (build-mate-read-encoder rname->idx ds-encoders)
tags-encoder (build-auxiliary-tags-encoder tag-dict ds-encoders tag-encoders)
mate-encoder (build-mate-read-encoder ctx ds-encoders)
tags-encoder (build-auxiliary-tags-encoder ctx ds-encoders)
mapped-encoder (build-mapped-read-encoder ds-encoders)
unmapped-encoder (build-unmapped-read-encoder ds-encoders)]
unmapped-encoder (build-unmapped-read-encoder ds-encoders)
{:keys [BF CF]} ds-encoders]
(fn [record]
(let [bf (bit-and (long (:flag record))
(bit-not (sam.flag/encoded #{:next-reversed :next-unmapped})))]
Expand All @@ -138,12 +139,11 @@
(mapped-encoder record))))))

(defn encode-slice-records
"Encodes CRAM records in a slice all at once using the given data series encoders
and tag encoders. Returns the alignment stats for this slice."
[cram-header rname->idx tag-dict ds-encoders tag-encoders records]
"Encodes CRAM records in a slice all at once using the given encoding context.
Returns the alignment stats for this slice."
[ctx records]
(let [stats-builder (stats/make-alignment-stats-builder)
record-encoder (build-cram-record-encoder cram-header rname->idx tag-dict
ds-encoders tag-encoders stats-builder)]
record-encoder (build-cram-record-encoder ctx stats-builder)]
(run! record-encoder records)
(stats/build stats-builder)))

Expand Down Expand Up @@ -202,7 +202,7 @@
"Preprocesses slice records to calculate some record fields prior to record
encoding that are necessary for the CRAM writer to generate some header
components."
[seq-resolver rname->idx tag-dict-builder subst-mat ^objects records]
[{:keys [rname->idx subst-mat seq-resolver tag-dict-builder]} ^objects records]
(dotimes [i (alength records)]
(let [record (aget records i)
;; these flag bits of CF are hard-coded at the moment:
Expand Down
97 changes: 42 additions & 55 deletions src/cljam/io/cram/writer.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
(ns cljam.io.cram.writer
(:require [cljam.io.crai :as crai]
[cljam.io.cram.data-series :as ds]
[cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.record :as record]
[cljam.io.cram.encode.structure :as struct]
[cljam.io.cram.encode.tag-dict :as tag-dict]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.protocols :as protocols]
[cljam.io.sam.util.header :as sam.header])
Expand Down Expand Up @@ -50,13 +49,29 @@
(struct/encode-cram-header-container (.-stream wtr) header))

(defn- preprocess-records
[seq-resolver rname->idx tag-dict-builder subst-mat ^objects container-records]
(dotimes [i (alength container-records)]
(let [slice-records (aget container-records i)]
(record/preprocess-slice-records seq-resolver rname->idx tag-dict-builder
subst-mat slice-records))))

(defn- reference-md5 [seq-resolver cram-header {:keys [^long ri ^long start ^long end]}]
[cram-header preservation-map seq-resolver ^objects container-records]
(let [container-ctx (context/make-container-context cram-header
preservation-map
seq-resolver)]
(dotimes [i (alength container-records)]
(let [slice-records (aget container-records i)]
(record/preprocess-slice-records container-ctx slice-records)))
(context/finalize-container-context container-ctx)))

(defn- generate-blocks [slice-ctx]
(->> (context/encoding-results slice-ctx)
(keep (fn [{:keys [content-id ^bytes data] :as block}]
(when (pos? (alength data))
(update block :data
#(struct/generate-block :raw 4 content-id %)))))
;; sort + dedupe by :content-id
(into (sorted-map) (map (juxt :content-id identity)))
vals
(cons {:content-id 0
:data (struct/generate-block :raw 5 0 (byte-array 0))})))

(defn- reference-md5
[{:keys [seq-resolver cram-header]} {:keys [^long ri ^long start ^long end]}]
(if (neg? ri)
(byte-array 16)
(let [chr (:SN (nth (:SQ cram-header) ri))
Expand All @@ -71,28 +86,11 @@
:bases nbases
:records nrecords})

(defn- generate-slice
[seq-resolver cram-header rname->idx counter tag-dict tag-encodings slice-records]
(let [ds-encoders (ds/build-data-series-encoders ds/default-data-series-encodings)
tag-encoders (ds/build-tag-encoders tag-encodings)
stats (record/encode-slice-records cram-header rname->idx tag-dict
ds-encoders tag-encoders slice-records)
ds-results (mapcat #(%) (vals ds-encoders))
tag-results (for [[_tag v] tag-encoders
[_type encoder] v
res (encoder)]
res)
blocks (->> (concat ds-results tag-results)
(keep (fn [{:keys [content-id ^bytes data] :as block}]
(when (pos? (alength data))
(update block :data
#(struct/generate-block :raw 4 content-id %)))))
;; sort + dedupe by :content-id
(into (sorted-map) (map (juxt :content-id identity)))
vals
(cons {:content-id 0
:data (struct/generate-block :raw 5 0 (byte-array 0))}))
ref-md5 (reference-md5 seq-resolver cram-header stats)
(defn- generate-slice [container-ctx counter slice-records]
(let [slice-ctx (context/make-slice-context container-ctx)
stats (record/encode-slice-records slice-ctx slice-records)
blocks (generate-blocks slice-ctx)
ref-md5 (reference-md5 slice-ctx stats)
header (assoc (stats->header-base stats)
:counter counter
:embedded-reference -1
Expand All @@ -108,15 +106,18 @@
(map #(alength ^bytes %))
(apply + (alength header-block)))}))

(defn- generate-slices
[seq-resolver cram-header rname->idx counter tag-dict tag-encodings container-records]
(defn- generate-slices [container-ctx counter container-records]
(loop [[slice-records & more] container-records, counter counter, acc []]
(if slice-records
(let [slice (generate-slice seq-resolver cram-header rname->idx counter
tag-dict tag-encodings slice-records)]
(let [slice (generate-slice container-ctx counter slice-records)]
(recur more (:counter slice) (conj acc slice)))
acc)))

(defn- generate-compression-header-block
^bytes [{:keys [preservation-map subst-mat tag-dict ds-encodings tag-encodings]}]
(struct/generate-compression-header-block preservation-map subst-mat tag-dict
ds-encodings tag-encodings))

(defn- generate-container-header [^bytes compression-header-block slices]
(let [stats (stats/merge-stats (map :stats slices))
container-len (->> slices
Expand Down Expand Up @@ -166,28 +167,14 @@
(crai/write-index-entries index-writer entries)))

(defn- write-container [^CRAMWriter wtr cram-header counter container-records]
(let [^DataOutputStream out (.-stream wtr)
ds-encodings ds/default-data-series-encodings
(let [preservation-map {:RN true, :AP false, :RR true}
seq-resolver (.-seq-resolver wtr)
rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
tag-dict-builder (tag-dict/make-tag-dict-builder)
subst-mat {\A {\T 0, \G 1, \C 2, \N 3}
\T {\A 0, \G 1, \C 2, \N 3}
\G {\A 0, \T 1, \C 2, \N 3}
\C {\A 0, \T 1, \G 2, \N 3}
\N {\A 0, \T 1, \G 2, \C 3}}
_ (preprocess-records seq-resolver rname->idx tag-dict-builder
subst-mat container-records)
tag-dict (tag-dict/build-tag-dict tag-dict-builder)
tag-encodings (tag-dict/build-tag-encodings tag-dict)
slices (generate-slices seq-resolver cram-header rname->idx
counter tag-dict tag-encodings container-records)
compression-header-block (struct/generate-compression-header-block
{:RN true, :AP false, :RR true}
subst-mat tag-dict ds-encodings tag-encodings)
container-ctx (preprocess-records cram-header preservation-map seq-resolver
container-records)
slices (generate-slices container-ctx counter container-records)
compression-header-block (generate-compression-header-block container-ctx)
container-header (generate-container-header compression-header-block slices)
^DataOutputStream out (.-stream wtr)
container-offset (.size out)
counter' (:counter (peek slices))]
(struct/encode-container-header out (assoc container-header :counter counter))
Expand Down
50 changes: 19 additions & 31 deletions test/cljam/io/cram/encode/record_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns cljam.io.cram.encode.record-test
(:require [cljam.io.cram.data-series :as ds]
(:require [cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.record :as record]
[cljam.io.cram.encode.tag-dict :as tag-dict]
[cljam.io.cram.seq-resolver.protocol :as resolver]
[cljam.io.sequence :as cseq]
[cljam.test-common :as common]
Expand Down Expand Up @@ -70,8 +69,8 @@
4]))

(deftest preprocess-slice-records-test
(let [rname->idx {"ref" 0}
tag-dict-builder (tag-dict/make-tag-dict-builder)
(let [cram-header {:SQ [{:SN "ref"}]}
container-ctx (context/make-container-context cram-header {} test-seq-resolver)
records (object-array
[{:rname "ref", :pos 1, :cigar "5M", :seq "AGAAT", :qual "HFHHH"
:options [{:RG {:type "Z", :value "rg001"}}
Expand All @@ -91,8 +90,7 @@
{:NM {:type "c", :value 2}}]}
{:rname "*", :pos 0, :cigar "*", :seq "CTGTG", :qual "AEEEE"
:options []}])]
(record/preprocess-slice-records test-seq-resolver rname->idx
tag-dict-builder subst-mat records)
(record/preprocess-slice-records container-ctx records)
(is (= [{:rname "ref", :pos 1, :cigar "5M", :seq "AGAAT", :qual "HFHHH"
:options [{:RG {:type "Z", :value "rg001"}}
{:MD {:type "Z", :value "2C2"}}
Expand Down Expand Up @@ -132,11 +130,7 @@
:RG
[{:ID "rg001"}
{:ID "rg002"}]}
rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
tag-dict-builder (tag-dict/make-tag-dict-builder)
ds-encoders (ds/build-data-series-encoders ds/default-data-series-encodings)
container-ctx (context/make-container-context cram-header {} test-seq-resolver)
records (object-array
[{:qname "q001", :flag 99, :rname "ref", :pos 1, :end 5, :mapq 0,
:cigar "5M", :rnext "=", :pnext 151, :tlen 150, :seq "AGAAT", :qual "HFHHH"
Expand All @@ -161,15 +155,13 @@
{:qname "q005", :flag 73, :rname "ref", :pos 20, :end 24, :mapq 0,
:cigar "5M", :rnext "*", :pnext 0, :tlen 0, :seq "CTGTG", :qual "AEEEE"
:options []}])
_ (record/preprocess-slice-records test-seq-resolver rname->idx
tag-dict-builder subst-mat records)
tag-dict (tag-dict/build-tag-dict tag-dict-builder)
tag-encodings (tag-dict/build-tag-encodings tag-dict)
tag-encoders (ds/build-tag-encoders tag-encodings)
stats (record/encode-slice-records cram-header rname->idx tag-dict
ds-encoders tag-encoders records)
ds-res (walk/prewalk #(if (fn? %) (%) %) ds-encoders)
tag-res (walk/prewalk #(if (fn? %) (%) %) tag-encoders)]
slice-ctx (-> container-ctx
(doto (record/preprocess-slice-records records))
context/finalize-container-context
context/make-slice-context)
stats (record/encode-slice-records slice-ctx records)
ds-res (walk/prewalk #(if (fn? %) (%) %) (:ds-encoders slice-ctx))
tag-res (walk/prewalk #(if (fn? %) (%) %) (:tag-encoders slice-ctx))]
(is (= {:ri 0, :start 1, :end 24, :nbases 25, :nrecords 5}
(into {} stats)))

Expand Down Expand Up @@ -321,11 +313,7 @@
(let [cram-header {:SQ
[{:SN "ref"}
{:SN "ref2"}]}
rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
tag-dict-builder (tag-dict/make-tag-dict-builder)
ds-encoders (ds/build-data-series-encoders ds/default-data-series-encodings)
container-ctx (context/make-container-context cram-header {} test-seq-resolver)
records (object-array
[{:qname "q001", :flag 77, :rname "*", :pos 0, :end 0, :mapq 0,
:cigar "*", :rnext "*", :pnext 0, :tlen 0, :seq "AATCC", :qual "CCFFF"
Expand All @@ -342,12 +330,12 @@
{:qname "q003", :flag 77, :rname "*", :pos 0, :end 0, :mapq 0,
:cigar "*", :rnext "*", :pnext 0, :tlen 0, :seq "GCACA", :qual "BCCFD"
:options []}])
_ (record/preprocess-slice-records test-seq-resolver rname->idx
tag-dict-builder subst-mat records)
tag-dict (tag-dict/build-tag-dict tag-dict-builder)
stats (record/encode-slice-records cram-header rname->idx tag-dict
ds-encoders {} records)
ds-res (walk/prewalk #(if (fn? %) (%) %) ds-encoders)]
slice-ctx (-> container-ctx
(doto (record/preprocess-slice-records records))
context/finalize-container-context
context/make-slice-context)
stats (record/encode-slice-records slice-ctx records)
ds-res (walk/prewalk #(if (fn? %) (%) %) (:ds-encoders slice-ctx))]
(is (= {:ri -1, :start 0, :end 0, :nbases 25, :nrecords 5}
(into {} stats)))

Expand Down

0 comments on commit 265b14f

Please sign in to comment.