-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
289 lines (240 loc) · 11 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import json
import os
from typing import Generator, List, Union
import numpy as np
import pandas as pd
from numpy import ndarray
from BSCSimulator.bloodgroups import ANTIGENS, bloodgroup_frequency
def create_pop_phenotype_table(output_file, donor=False):
phen, freq_whi = bloodgroup_frequency(np.full(17, True), np.array([[1], [0]]), donor=donor)
_, freq_bla = bloodgroup_frequency(np.full(17, True), np.array([[0], [1]]), donor=donor)
antigens = ["A", "B", "D", "C", "c", "E", "e", "K",
"k", "Fya", "Fyb", "Jka", "Jkb", "M", "N", "S", "s"]
df = pd.DataFrame(phen, columns=antigens)
df['Black_frequencies'] = freq_bla
df['White_frequencies'] = freq_whi
pows = np.arange(len(antigens) - 1, -1, -1)
ints = 2 ** pows
antints = phen.dot(ints[:, None])
df['Phenotype_decimal'] = antints
df2 = df[antigens + ['Phenotype_decimal']].copy()
df2 = df2.astype(int)
df2['Black_frequencies'] = freq_bla
df2['White_frequencies'] = freq_whi
freq_file = os.path.realpath(output_file)
df2.to_csv(freq_file, sep='\t', index=False)
return df2
def population_phenotype(config_file, black_pop_ratio) -> pd.DataFrame:
"""
Generates a DataFrame containing phenotype decimal values and their frequencies based on the given population ratio.
Args:
config_file (str): Path to the JSON configuration file containing antigen information.
black_pop_ratio (float): Ratio of the Black population in the total population.
Returns:
df (DataFrame): A DataFrame with two columns:
- 'phenotype_decimal': Decimal representation of binary phenotypes.
- 'frequencies': Frequencies of the corresponding phenotypes.
"""
pop_ratio = np.array([[1 - black_pop_ratio], [black_pop_ratio]])
with open(config_file) as json_file:
antigen_info = json.load(json_file)
phen, freq_pop, antigens = bloodgroup_frequency(antigen_info, pop_ratio)
pows = np.arange(len(antigens) - 1, -1, -1)
ints = 2 ** pows
antints = phen.dot(ints[:, None])
df = pd.DataFrame(antints, columns=['phenotype_decimal']).astype(int)
df['frequencies'] = freq_pop
return df
def dummy_population_phenotypes(config_file) -> pd.DataFrame:
"""Create a dummy population phenotype table.
:param config_file: Path to the TSV file containing the major blood group distribution in the population.
:return: A pandas dataframe containing the population phenotype table.
"""
df = pd.read_csv(config_file, sep='\t')
df = df.drop(columns=['ABOD'])
return df
def pad_abd_phenotypes(df, padding_length):
"""
Pads (in binary) the 'phenotype_decimal' column in the given DataFrame by a specified length.
This function multiplies the 'phenotype_decimal' column by 2 raised to the power of
the padding length, and then adds (2 raised to the power of the padding length) minus 1
to the result. This effectively pads the 'phenotype_decimal' values with 1s at the end in binary.
Parameters:
df (DataFrame): The DataFrame containing the 'phenotype_decimal' column to be padded.
padding_length (int): The length by which to pad the 'phenotype_decimal' values.
Returns:
df (DataFrame): The DataFrame with the padded 'phenotype_decimal' column.
"""
df.phenotype_decimal *= 2 ** padding_length
df.phenotype_decimal += 2 ** padding_length - 1
return df
def abd_usability(non_scd_freqs: ndarray, scd_requsts_ratio=330/3_500, black_pop_ratio_in_scd=1.0) -> ndarray:
"""Calculate the usability of ABD blood groups.
:param non_scd_freqs: A numpy array containing the frequencies of ABD blood groups in the non-SCD population.
:param scd_requsts_ratio: The ratio of SCD units requested to the total number of units requested.
:param float black_pop_ratio_in_scd: The ratio of black population in SCD patients.
:return: A numpy array containing the usability of ABD blood groups.
"""
scd_phens = population_phenotype(
'data/bloodgroup_frequencies/usability_blood_groups.json', black_pop_ratio_in_scd)
scd_freqs = scd_phens.frequencies.to_numpy().flatten()
# non_scd_freqs = np.array([])
all_freqs = (1 - scd_requsts_ratio) * non_scd_freqs + \
scd_requsts_ratio * scd_freqs
all_freqs = all_freqs / all_freqs.sum()
compatibility = [[True] * 8, # O-
[False, True] * 4, # O+
[False, False, True, True] * 2, # B-
[False, False, False, True] * 2, # B+
[False] * 4 + [True] * 4, # A-
[False] * 4 + [False, True] * 2, # A+
[False] * 6 + [True] * 2, # AB-
[False] * 6 + [False, True] # AB+
]
usability = [all_freqs[compatibility[i]].sum() for i in range(8)]
return np.array(usability)
def list_of_permutations(domain_list) -> list:
"""
Generate a list of all possible permutations from a list of domains.
Args:
domain_list (list): A list of lists, where each sublist represents a domain of possible values.
Returns:
list: A list of lists, where each sublist is a unique permutation of values from the input domains.
Example:
>>> list_of_permutations([[1, 2], ['a', 'b']])
[[1, 'a'], [1, 'b'], [2, 'a'], [2, 'b']]
"""
prototype = []
num_permutations = 1
divisors = []
len_dl = len(domain_list)
for i in range(len_dl - 1, -1, -1):
domain = domain_list[i]
prototype = prototype + [domain[0]]
len_d = len(domain)
num_permutations = num_permutations * len_d
divisors.append(num_permutations / len_d)
permutation_list = []
for i in range(num_permutations):
permutation = []
for j in range(len_dl):
domain = domain_list[j]
k = int(i / divisors[(len_dl - 1 - j)]) % len(domain)
permutation.append(domain[k])
permutation_list.append(permutation)
return permutation_list
def _normalize(Y, normalization_type='stats'):
"""Normalize the vector Y using statistics or its range.
:param Y: Row or column vector that you want to normalize.
:param normalization_type: String specifying the kind of normalization
to use. Options are 'stats' to use mean and standard deviation,
or 'maxmin' to use the range of function values.
:return Y_normalized: The normalized vector.
"""
Y = np.asarray(Y, dtype=float)
if np.max(Y.shape) != Y.size:
raise NotImplementedError('Only 1-dimensional arrays are supported.')
# Only normalize with non null sdev (divide by zero). For only one
# data point both std and ptp return 0.
if normalization_type == 'stats':
Y_norm = Y - Y.mean()
std = Y.std()
if std > 0:
Y_norm /= std
elif normalization_type == 'maxmin':
Y_norm = Y - Y.min()
y_range = np.ptp(Y)
if y_range > 0:
Y_norm /= y_range
# A range of [-1, 1] is more natural for a zero-mean GP
Y_norm = 2 * (Y_norm - 0.5)
else:
raise ValueError(
'Unknown normalization type: {}'.format(normalization_type))
return Y_norm
def normalize(
y: Union[ndarray, List],
norm_type='stats', *, upper=1, lower=-1, anchor_upper=None, anchor_lower=None) -> ndarray:
"""
Normalizing the array using statistics of its range.
Normalisation takes place along each column.
:param y: 2-D array-like data that you want to normalize.
:param norm_type: String specifying the kind of normalisation to use.
Options are `stats' to use mean and standard deviation, or
'maxmin' to use range of function values so that elements
lie in range [lower, ``upper].
:param upper: Upper bound on normalised values when using norm_type='maxmin'
:param lower: Lower bound on normalised values when using norm_type='maxmin'
:param anchor_upper: Fixed upper bound on values in ``y``.
If specified, a row vector with values equal to ``anchor_upper`` is
appended to ``y`` before normalisation and removed after.
:param anchor_lower: Fixed lower bound on values in ``y``.
If specified, a row vector with values equal to ``anchor_lower`` is
appended to ``y`` before normalisation and removed after.
:return: A normalized numpy array.
Example
---------
>>> y = np.array([[1,3,5,5], [1,4,6,3], [1,6,9,0]])
>>> y
array([[1, 3, 5, 5],
[1, 4, 6, 3],
[1, 6, 9, 0]])
>>> normalize(y, 'maxmin', upper=1, lower=0)
array([[0.5 , 0. , 0. , 1. ],
[0.5 , 0.33333333, 0.25 , 0.6 ],
[0.5 , 1. , 1. , 0. ]])
>>> normalize(y)
array([[ 0. , -1.06904497, -0.98058068, 1.13554995],
[ 0. , -0.26726124, -0.39223227, 0.16222142],
[ 0. , 1.33630621, 1.37281295, -1.29777137]])
>>> normalize(y, 'maxmin')
array([[ 0. , -1. , -1. , 1. ],
[ 0. , -0.33333333, -0.5 , 0.2 ],
[ 0. , 1. , 1. , -1. ]])
"""
y_arr = np.asarray(y, dtype=float)
anchors = 0
if anchor_upper is not None:
if len(y_arr.shape) == 1:
y_arr = np.append(y_arr, anchor_upper)
else:
anchor_vec = np.ones((1, y_arr.shape[1])) * anchor_upper
y_arr = np.vstack((y_arr, anchor_vec))
anchors += 1
if anchor_lower is not None:
if len(y_arr.shape) == 1:
y_arr = np.append(y_arr, anchor_lower)
else:
anchor_vec = np.ones((1, y_arr.shape[1])) * anchor_lower
y_arr = np.vstack((y_arr, anchor_vec))
anchors += 1
if norm_type == 'maxmin' and upper <= lower:
raise ValueError('Upper bound must be greater than lower bound.')
if len(y_arr.shape) == 1:
y_norm = _normalize(y_arr, norm_type)
if norm_type == 'maxmin':
_hr = (upper - lower)/2
_mean = (upper + lower)/2
y_norm = _hr * y_norm + _mean
y_norm = y_norm[:len(y_norm)-anchors]
return y_norm
if norm_type == 'stats':
y_mean = y.mean(axis=0)
y_norm = y_arr - y_mean
y_std = y_arr.std(axis=0)
gt_zero = np.flatnonzero(y_std)
y_norm[:, gt_zero] /= y_std[gt_zero]
elif norm_type == 'maxmin':
y_min = y_arr.min(axis=0)
y_norm = y_arr - y_min
y_range = y_arr.ptp(axis=0)
gt_zero = np.flatnonzero(y_range)
eq_zero = np.flatnonzero(np.sum(y_norm, axis=0) == 0)
_range = upper - lower
y_norm[:, gt_zero] /= y_range[gt_zero]
y_norm[:, eq_zero] = 0.5
y_norm = _range * y_norm + lower
else:
raise ValueError('Unknown normalization type: {}'.format(norm_type))
y_norm = y_norm[:len(y_norm)-anchors]
return y_norm