-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics.py
244 lines (201 loc) · 8.84 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import numexpr as ne
import numpy as np
def usability(supply, demand):
assert len(demand.shape) == 2
usability_norm = demand.size
supply = np.atleast_2d(supply)
demand_3d = demand[:, :, None]
demand_rotated = np.swapaxes(demand_3d, 1, 2)
usability = np.sum(demand_rotated >= supply, axis=(0, 2)) / usability_norm
return usability
def phenotype_usability(supply, demand):
assert len(demand.shape) == 2
usability_norm = len(demand)
supply = np.atleast_2d(supply)
demand_3d = demand[:, :, None]
demand_rotated = np.swapaxes(demand_3d, 1, 2)
usability = np.all(demand_rotated >= supply, axis=2).sum(
axis=0) / usability_norm
return usability
def pop_phenotype_usability(abd_phenotypes, frequencies):
usability = frequencies[abd_phenotypes.flatten()]
return usability
def receivability(supply, demand):
assert len(supply.shape) == 2
receivability_norm = supply.size
demand = np.atleast_2d(demand)
supply_3d = supply[:, :, None]
supply_rotated = np.swapaxes(supply_3d, 1, 2)
receivability = np.sum(supply_rotated <= demand,
axis=(0, 2)) / receivability_norm
return receivability
def sum_of_substitutions(supply, demand, weights=None):
"""
Calculate the sum of substitutions between each unit in the supply array
and each request in the demand array. The sum of substitutions is normalised
by the `weights` argument.
Parameters:
supply (array-like): The supply array. It will be converted to at least 2D.
demand (array-like): The demand array. It should be a 2D array.
weights (array-like, optional): The weights for each demand column. If None,
weights will be set to an array of ones with
the same length as the number of columns in demand.
Returns:
substitutions (ndarray): The sum of substitutions for each row in the demand array.
"""
assert len(demand.shape) == 2
weights = np.ones(demand.shape[1]) if weights is None else weights
substitution_norm = weights.sum()
supply = np.atleast_2d(supply)
demand_3d = demand[:, :, None]
demand_rotated = np.swapaxes(demand_3d, 1, 2)
substitutions = ne.evaluate(
'sum((demand_rotated > supply) * weights, axis=2)') / substitution_norm
return substitutions
def immunogenicity(supply, demand, risk):
"""
Calculate the immunogenicity metric based on supply, demand, and risk.
Parameters:
supply (ndarray): A 2D array representing the supply phenotypes.
demand (ndarray): A 2D array representing the demand phenotypes.
risk (ndarray): A 1D array representing the immune risk values.
Returns:
immunogenicity (ndarray): An array representing the immunogenicity metric.
Raises:
AssertionError: If the demand array does not have exactly 2 dimensions.
"""
assert len(demand.shape) == 2
risk_norm = risk.sum()
supply = np.atleast_2d(supply)
demand_3d = demand[:, :, None]
demand_rotated = np.swapaxes(demand_3d, 1, 2)
immunogenicity = ne.evaluate(
'sum((demand_rotated < supply) * risk, axis=2)') / risk_norm
return immunogenicity
def opportunity_cost(supply, demand, normalised=True):
Fsi = usability(supply, demand)
Fsj = usability(demand, demand)
Fsj = Fsj[:, None]
oc = Fsi - Fsj
if normalised:
oc = oc / Fsi
return oc
def major_antigen_substitution(supply, demand, population=False, pop_frequencies=None):
"""
Calculate the major antigen substitution penalty between supply and demand phenotypes.
Parameters:
supply (array-like): The supply data, representing the the supply phenotypes.
demand (array-like): The demand data, representing the demand phenotypes.
population (bool, optional): If True, compute usability using population frequencies instead of current demand.
Defaults to False.
pop_frequencies (array-like, optional): Population frequencies to be used if population is True. Defaults to None.
Returns:
mas (ndarray): The major antigen substitution matrix.
"""
if population:
Fsi = pop_phenotype_usability(supply, pop_frequencies)
Fsj = pop_phenotype_usability(demand, pop_frequencies)
else:
Fsi = phenotype_usability(supply, demand)
Fsj = phenotype_usability(demand, demand)
Fsj = Fsj[:, None]
mas = Fsi - Fsj
return mas
def fifo_discount(remaining_shelf_life, max_life=35, reqs_dates=None, scd_pat=False):
if reqs_dates is None:
discount = _fifo_discount(remaining_shelf_life)
# Penalise forecasted units so they cannot be used for today's requests
discount[remaining_shelf_life > max_life] = -1e17
discount = discount[None, :]
if np.any(scd_pat):
len_demand = len(scd_pat)
discount = np.full((len_demand, discount.size), discount)
discount[scd_pat] = 0
else:
_remaining_shelf_life = remaining_shelf_life[None,
:] - reqs_dates[:, None]
discount = _fifo_discount(_remaining_shelf_life)
allocating_to_the_past = (
remaining_shelf_life - max_life)[None, :] > reqs_dates[:, None]
will_expire = _remaining_shelf_life < 1
discount[allocating_to_the_past] = -1e24
discount[will_expire] = -1e17
if np.any(scd_pat):
discount[scd_pat] = 0
return discount
def young_blood_penalty(unit_age, max_young_blood=14, reqs_dates=None, scd_pat=True):
"""Young blood/old blood penalties and constraints.
Adds a constraint so that blood that is not 'young blood' cannot be used
for SCD patients.
Sets penalties and bonuses so that older 'young blood' is used first
up 7 days old, then between 7 and 14 days old, the penalties exponentially increase.
:param unit_age: age minus 1 of the unit in days.
:param max_young_blood: maximum age of young blood in days.
:param reqs_dates: dates of the requests.
:param scd_pat: boolean array indicating which requests are for SCD patients.
:return: penalty array.
"""
if reqs_dates is None:
penalty = _young_blood_penalty(unit_age + 1)
# Penalise non-young blood so it cannot be used
penalty[unit_age + 1 > max_young_blood] = 1e17
penalty = penalty[None, :]
if not np.all(scd_pat):
len_demand = len(scd_pat)
penalty = np.full((len_demand, penalty.size), penalty)
penalty[~scd_pat] = 0
else:
_unit_age = unit_age[None, :] + reqs_dates[:, None]
penalty = _young_blood_penalty(_unit_age + 1)
allocating_to_the_past = _unit_age < 0
penalty[allocating_to_the_past] = 1e24
will_be_old_blood = _unit_age + 1 > max_young_blood
penalty[will_be_old_blood] = 1e17
if not np.all(scd_pat):
penalty[~scd_pat] = 0
return penalty
def _fifo_discount(shelf_life):
max_abs = np.max(np.abs(shelf_life))
poss_shelf_lifes = np.hstack(
(np.arange(max_abs + 1), np.arange(-max_abs, 0)))
shelf_life_lookups = 0.5 ** (poss_shelf_lifes / 5)
discount = shelf_life_lookups[shelf_life.astype(int)]
return discount
def _young_blood_penalty(unit_age, a=7.686455, b=9.580724, c=0, d=1.1976):
max_abs = np.max(np.abs(unit_age))
poss_unit_ages = np.hstack(
(np.arange(max_abs + 1), np.arange(-max_abs, 0)))
unit_age_lookups = (-1/a * poss_unit_ages +
np.exp(poss_unit_ages - b) + c) * d
penalty = unit_age_lookups[unit_age.astype(int)]
return penalty
if __name__ == "__main__":
np.random.seed(20220228)
extreme_phens = np.array([[1, 1, 1, 1], [0, 0, 0, 0]])
supply = np.vstack((extreme_phens, np.random.randint(0, 2, (13, 4))))
demand = np.vstack((extreme_phens, np.random.randint(0, 2, (5, 4))))
print('Supply:\n', supply)
print('Demand:\n', demand)
print()
F_1 = usability(supply[0], demand)
print(f'Usability of first supply unit: {F_1[0]}.\n')
F_2 = usability(supply[1], demand)
print(f'Usability of second supply unit: {F_2[0]}.\n')
F_last = usability(supply[-1], demand)
print(f'Usability of last supply unit: {F_last[0]}.\n')
Fs = usability(supply, demand)
print(f'\nUsability:\n{Fs}')
Fd = receivability(supply, demand)
print(f'\nReceivability:\n{Fd}')
S = sum_of_substitutions(supply, demand)
print(f'\nSubstitutions normalised:\n{S}')
Imm = immunogenicity(supply, demand, np.array([2, 1, 5, 8]))
print(f'\nImmunogenicity normalised:\n{Imm[:4, :4]}')
Fsj = usability(demand, demand)
print(f'\nUsability of demanded units:\n{Fsj}')
Fsj_ex = Fsj[:, None]
OC = (Fs - Fsj_ex) / Fs
OC[OC < 0] = 10
print(f'\nOpportunity Cost:\n{OC}')
C = OC + Imm + S
print(f'\nCost Matrix:\n{C}')