forked from benedictpaten/cPecan
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcPecanModifyHmm.py
executable file
·86 lines (66 loc) · 3.86 KB
/
cPecanModifyHmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python3
from cPecan.cPecanEm import Hmm, SYMBOL_NUMBER
import sys
import numpy as np
from optparse import OptionParser
from functools import reduce
"""Modifies an HMM model trained with margin align.
"""
toMatrix = lambda e : [e[SYMBOL_NUMBER*i:SYMBOL_NUMBER*(i+1)] for i in range(SYMBOL_NUMBER)]
fromMatrix = lambda e : reduce(lambda x, y : list(x) + list(y), e)
def normaliseHmmByReferenceGCContent(hmm, gcContent):
#Normalise background emission frequencies, if requested to GC% given
for state in range(hmm.stateNumber):
if state not in (2, 4): #Don't normalise GC content of insert states (as they don't have any ref bases!)
n = toMatrix(hmm.emissions[(SYMBOL_NUMBER**2) * state:(SYMBOL_NUMBER**2) * (state+1)])
hmm.emissions[(SYMBOL_NUMBER**2) * state:(SYMBOL_NUMBER**2) * (state+1)] = fromMatrix([[(n[i][j]/sum(n[i])) * (gcContent/2.0 if i in [1, 2] else (1.0-gcContent)/2.0) for j in range(SYMBOL_NUMBER)] for i in range(SYMBOL_NUMBER)]) #Normalise
def modifyHmmEmissionsByExpectedVariationRate(hmm, substitutionRate):
#Normalise background emission frequencies, if requested to GC% given
n = toMatrix([(1.0-substitutionRate) if i % SYMBOL_NUMBER == i / SYMBOL_NUMBER else substitutionRate/(SYMBOL_NUMBER-1) for i in range(SYMBOL_NUMBER**2)])
hmm.emissions[:SYMBOL_NUMBER**2] = fromMatrix(np.dot(toMatrix(hmm.emissions[:SYMBOL_NUMBER**2]), n))
def setHmmIndelEmissionsToBeFlat(hmm):
#Set indel emissions to all be flat
for state in range(1, hmm.stateNumber):
hmm.emissions[(SYMBOL_NUMBER**2) * state:(SYMBOL_NUMBER**2) * (state+1)] = [1.0/(SYMBOL_NUMBER**2)]*SYMBOL_NUMBER**2
def main():
#Parse the inputs args/options
parser = OptionParser(usage="usage: inputModel outputModel [options]",
version="%prog 0.1")
parser.add_option("--substitutionRate", dest="substitutionRate",
help="The probability per base of a difference between \
the sequenced reference and the reference the reads are aligned to. \
Value must be between 0 and 1.",
default=0.00, type=float)
parser.add_option("--gcContent", dest="gcContent",
help="The desired GC content of the model. \
By default no adjustment is made; value must be between 0 and 1.",
default=None, type=float)
parser.add_option("--setFlatIndelEmissions", dest="setFlatIndelEmissions",
help="Set all indel emissions probability to be equal regardless of base.",
default=False, action="store_true")
#Parse the options/arguments
options, args = parser.parse_args()
#Print help message if no input
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
#Exit if the arguments are not what we expect
if len(args) != 2:
raise RuntimeError("Expected two arguments, got: %s" % " ".join(args))
#Load HMM
hmm = Hmm.loadHmm(sys.argv[1])
#Normalise background emission frequencies, if requested to GC% given
if options.gcContent != None:
if options.gcContent < 0 or options.gcContent > 1.0:
raise RuntimeError("Substitution rate is not a value between 0 and 1, got: %s" % options.gcContent)
normaliseHmmByReferenceGCContent(hmm, options.gcContent)
#Modify match emissions by proposed substitution rate
if options.substitutionRate < 0 or options.substitutionRate > 1.0:
raise RuntimeError("Substitution rate is not a value between 0 and 1, got: %s" % options.substitutionRate)
modifyHmmEmissionsByExpectedVariationRate(hmm, options.substitutionRate)
if options.setFlatIndelEmissions:
setHmmIndelEmissionsToBeFlat(hmm)
#Write out HMM
hmm.write(sys.argv[2])
if __name__ == '__main__':
main()