-
Notifications
You must be signed in to change notification settings - Fork 1
/
calc_pixel_pairs.py
327 lines (274 loc) · 14.3 KB
/
calc_pixel_pairs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
"""
Accumulate flux products from pixel pairs to bins.
Uses a Python/C API methods for the actual calculation.
There are 2 modes of operation:
- mean: sum of weighted flux products, sum of weights
- median: a weighted histogram of flux products.
Also contains a slower reference implementation in pure Python (mean only)
"""
import lyacorr_cython_helper
from collections import namedtuple
import numpy as np
import bins_3d
import bins_3d_with_group_id
import common_settings
import flux_histogram_bins
import significant_qso_pairs
from data_access.numpy_spectrum_container import NpSpectrumContainer
from flux_accumulator import AccumulatorBase
settings = common_settings.Settings() # type: common_settings.Settings
NUM_BINS_X = 50
NUM_BINS_Y = 50
NUM_BINS_Z = settings.get_num_distance_slices()
MAX_Z_RESOLUTION = 1000
accumulator_types = namedtuple('accumulator_type', ['mean', 'mean_subsample', 'histogram'])
class PreAllocMatrices:
def __init__(self, z_res):
self.z_res = z_res
self.m1 = np.zeros([z_res, z_res], dtype='float32')
self.m2 = np.zeros([z_res, z_res], dtype='float32')
self.m4 = np.zeros([z_res, z_res], dtype='float32')
self.m5 = np.zeros([z_res, z_res], dtype='float32')
self.m6 = np.zeros([z_res, z_res], dtype='float32')
self.m7 = np.zeros([z_res, z_res], dtype='float32')
self.v1 = np.zeros(z_res, dtype='float32')
self.v2 = np.zeros(z_res, dtype='float32')
self.v3 = np.zeros(z_res, dtype='float32')
self.v4 = np.zeros(z_res, dtype='float32')
self.v5 = np.zeros(z_res, dtype='float32')
self.v6 = np.zeros(z_res, dtype='float32')
self.mask1 = np.zeros([z_res, z_res], dtype=bool)
self.mask2 = np.zeros([z_res, z_res], dtype=bool)
def zero(self):
self.m1.fill(0)
self.m2.fill(0)
self.mask1.fill(0)
self.mask2.fill(0)
self.m4.fill(0)
self.m5.fill(0)
self.m6.fill(0)
self.v1.fill(0)
self.v2.fill(0)
self.v3.fill(0)
self.v4.fill(0)
self.v5.fill(0)
self.v6.fill(0)
class PixelPairs:
def __init__(self, cd, max_transverse_separation, max_parallel_separation, accumulator_type):
"""
initialize persistent objects
:type cd: comoving_distance.ComovingDistance
:type max_transverse_separation: float
"""
self.cd = cd
self.max_transverse_separation = max_transverse_separation
self.max_parallel_separation = max_parallel_separation
self.pre_alloc_matrices = PreAllocMatrices(MAX_Z_RESOLUTION)
self.significant_qso_pairs = significant_qso_pairs.SignificantQSOPairs()
self.accumulator_type = accumulator_type
self.min_distance = cd.comoving_distance(settings.get_min_forest_redshift())
self.max_distance = cd.comoving_distance(settings.get_max_forest_redshift())
def find_nearby_pixels2(self, accumulator, qso_angle,
spec1_index, spec2_index, delta_t_file):
"""
Find all pixel pairs in QSO1,QSO2 that are closer than radius r.
This is a reference implementation in pure Python+Numpy.
:type accumulator: AccumulatorBase
:type qso_angle: float64
:type spec1_index: int
:type spec2_index: int
:type delta_t_file: NpSpectrumContainer
:return:
"""
# Note: not using pre_alloc_matrices.zero()
# the maximum distance that can be stored in the accumulator
max_parallel_separation = np.float32(accumulator.get_ranges()[1, 0])
max_transverse_separation = np.float32(accumulator.get_ranges()[1, 1])
spec1_z = delta_t_file.get_wavelength(spec1_index)
spec2_z = delta_t_file.get_wavelength(spec2_index)
if not (spec1_z.size and spec2_z.size):
return
assert spec1_z.min() > 0, "z out of range: {0}, spec index {1}".format(spec1_z.min(), spec1_index)
assert spec2_z.min() > 0, "z out of range: {0}, spec index {1}".format(spec2_z.min(), spec2_index)
# Note: throughout this method, "flux" means delta_f
spec1_flux = delta_t_file.get_flux(spec1_index)
spec1_distances = self.cd.fast_comoving_distance(spec1_z)
spec2_flux = delta_t_file.get_flux(spec2_index)
# print(spec2_flux)
spec2_distances = self.cd.fast_comoving_distance(spec2_z)
# get pre-calculated weights for each QSO
qso1_weights = delta_t_file.get_ivar(spec1_index)
qso2_weights = delta_t_file.get_ivar(spec2_index)
# if the parallel distance between forests is too large, they will not form pairs.
if (spec1_distances[0] > spec2_distances[-1] + max_parallel_separation) or \
(spec2_distances[0] > spec1_distances[-1] + max_parallel_separation):
return
# create matrices with first dimension of spec1 data points,
# second dimension of spec2 data points
y = spec1_distances.size
x = spec2_distances.size
# assign variables to pre-allocated memory
flux_products = self.pre_alloc_matrices.m2[:y, :x]
mask_matrix_parallel = self.pre_alloc_matrices.mask1[:y, :x]
mask_matrix_final = self.pre_alloc_matrices.mask2[:y, :x]
r_parallel = self.pre_alloc_matrices.m4[:y, :x]
r_transverse = self.pre_alloc_matrices.m5[:y, :x]
spec1_distances_sq = self.pre_alloc_matrices.v1[:y]
spec2_distances_sq = self.pre_alloc_matrices.v2[:x]
z_weights = self.pre_alloc_matrices.m6[:y, :x]
mean_distance = self.pre_alloc_matrices.m7[:y, :x]
np.square(spec1_distances, out=spec1_distances_sq)
np.square(spec2_distances, out=spec2_distances_sq)
# a matrix of flux products
np.outer(spec1_flux, spec2_flux, out=flux_products)
# r|| = abs(r1 - r2)
np.subtract(spec1_distances[:, None], spec2_distances, out=r_parallel)
np.abs(r_parallel, out=r_parallel)
# r_ = (r1 + r2)/2 * qso_angle
np.add(spec1_distances[:, None], spec2_distances, out=mean_distance)
np.multiply(mean_distance, 1. / 2, out=mean_distance)
np.multiply(mean_distance, qso_angle, out=r_transverse)
# mask all elements that are too far apart
np.less(r_parallel, max_parallel_separation, out=mask_matrix_parallel)
np.less(r_transverse, max_transverse_separation, out=mask_matrix_final)
np.logical_and(mask_matrix_parallel, mask_matrix_final, mask_matrix_final)
np.outer(qso1_weights, qso2_weights, out=z_weights)
# multiply fluxes by their respective weights
np.multiply(flux_products, z_weights, out=flux_products)
assert not np.isnan(flux_products).any()
assert not np.isnan(z_weights).any()
return accumulator.add_array_with_mask(flux_products,
r_parallel,
r_transverse,
mean_distance,
mask_matrix_final,
z_weights)
def find_nearby_pixels(self, accumulator, qso_angle,
spec1_index, spec2_index, delta_t_file,
group_id=0):
"""
Find all pixel pairs in QSO1,QSO2 that are closer than radius r.
This is a faster implementation that uses a Python/C API module.
:type accumulator: AccumulatorBase
:type qso_angle: float64
:type spec1_index: int64
:type spec2_index: int64
:type delta_t_file: NpSpectrumContainer
:type group_id: int64
:return:
"""
# Note: not using pre_alloc_matrices.zero()
# the maximum distance that can be stored in the accumulator
range_parallel = np.float32(accumulator.get_ranges()[1, 0])
spec1_z = delta_t_file.get_wavelength(spec1_index)
spec2_z = delta_t_file.get_wavelength(spec2_index)
if not (spec1_z.size and spec2_z.size):
return
assert spec1_z.min() > 0, "z out of range: {0}, spec index {1}".format(spec1_z.min(), spec1_index)
assert spec2_z.min() > 0, "z out of range: {0}, spec index {1}".format(spec2_z.min(), spec2_index)
# Note: throughout this method, "flux" means delta_f
spec1_flux = delta_t_file.get_flux(spec1_index)
spec1_distances = self.cd.fast_comoving_distance(spec1_z)
spec2_flux = delta_t_file.get_flux(spec2_index)
# print(spec2_flux)
spec2_distances = self.cd.fast_comoving_distance(spec2_z)
# get pre-calculated weights for each QSO
qso1_weights = delta_t_file.get_ivar(spec1_index)
qso2_weights = delta_t_file.get_ivar(spec2_index)
# if the parallel distance between forests is too large, they will not form pairs.
if (spec1_distances[0] > spec2_distances[-1] + range_parallel) or \
(spec2_distances[0] > spec1_distances[-1] + range_parallel):
return
if self.accumulator_type == accumulator_types.mean:
ar = lyacorr_cython_helper.bin_pixel_pairs(spec1_distances, spec2_distances,
spec1_flux, spec2_flux,
qso1_weights, qso2_weights,
qso_angle,
accumulator.get_dims(),
accumulator.get_ranges())
local_bins = bins_3d.Bins3D(accumulator.get_dims(),
accumulator.get_ranges(), ar_existing_data=ar)
flux_contribution = np.nanmax(np.abs(local_bins.ar_flux))
self.significant_qso_pairs.add_if_larger(spec1_index, spec2_index, flux_contribution)
accumulator += local_bins
elif self.accumulator_type == accumulator_types.mean_subsample:
# local_bins = bins_3d_with_group_id.Bins3DWithGroupID(
# accumulator.get_dims(), accumulator.get_ranges()) # type: bins_3d_with_group_id.Bins3DWithGroupID
# retrieve or create storage for this array
assert isinstance(accumulator, bins_3d_with_group_id.Bins3DWithGroupID)
ar_view = accumulator.get_group_view(group_id).ar_data
lyacorr_cython_helper.bin_pixel_pairs(spec1_distances, spec2_distances,
spec1_flux, spec2_flux,
qso1_weights, qso2_weights,
qso_angle,
accumulator.get_dims(),
accumulator.get_ranges(),
ar_view)
# disabled for performance reasons
# flux_contribution = np.nanmax(np.abs(local_bins.dict_bins_3d_data[group_id].ar_flux))
# self.significant_qso_pairs.add_if_larger(spec1_index, spec2_index, flux_contribution)
# accumulator += local_bins
elif self.accumulator_type == accumulator_types.histogram:
assert isinstance(accumulator, flux_histogram_bins.FluxHistogramBins)
# TODO: try to avoid using implementation details of the accumulator interface
accumulator.pair_count += lyacorr_cython_helper.bin_pixel_pairs_histogram(
spec1_distances, spec2_distances,
spec1_flux, spec2_flux, qso1_weights, qso2_weights,
qso_angle,
accumulator.get_dims(),
accumulator.get_ranges(),
accumulator.ar_flux)
pass
def apply_to_flux_pairs(self, pairs, pairs_angles, delta_t_file, accumulator):
"""
:type pairs: np.array
:type pairs_angles: np.array
:type delta_t_file: NpSpectrumContainer
:type accumulator: AccumulatorBase
:rtype: AccumulatorBase
"""
n = 0
for spec1_index, spec2_index, group_id, angle_index in pairs:
qso_angle = pairs_angles[angle_index]
self.find_nearby_pixels(accumulator, qso_angle,
spec1_index, spec2_index, delta_t_file,
group_id=group_id)
n += 1
return accumulator
def create_bins(self):
"""
:rtype: AccumulatorBase
"""
pair_separation_bins = None
if self.accumulator_type == accumulator_types.mean:
pair_separation_bins = bins_3d.Bins3D(
dims=np.array([NUM_BINS_X, NUM_BINS_Y, NUM_BINS_Z]),
ranges=np.array([[0, 0, self.min_distance],
[self.max_parallel_separation, self.max_transverse_separation,
self.max_distance]]))
pair_separation_bins.set_filename(settings.get_mean_estimator_bins())
elif self.accumulator_type == accumulator_types.mean_subsample:
pair_separation_bins = bins_3d_with_group_id.Bins3DWithGroupID(
dims=np.array([NUM_BINS_X, NUM_BINS_Y, NUM_BINS_Z]),
ranges=np.array([[0, 0, self.min_distance],
[self.max_parallel_separation, self.max_transverse_separation,
self.max_distance]]))
pair_separation_bins.set_filename(settings.get_correlation_estimator_subsamples_npz())
elif self.accumulator_type == accumulator_types.histogram:
pair_separation_bins = flux_histogram_bins.FluxHistogramBins(
dims=np.array([NUM_BINS_X, NUM_BINS_Y, 1000]),
ranges=np.array([[0, 0, -2e3],
[self.max_parallel_separation, self.max_transverse_separation, 2e3]]))
pair_separation_bins.set_filename(settings.get_median_estimator_bins())
assert pair_separation_bins
return pair_separation_bins
def add_qso_pairs_to_bins(self, pairs, pairs_angles, delta_t_file):
"""
:type pairs: np.multiarray.ndarray
:type pairs_angles: np.multiarray.ndarray
:type delta_t_file: NpSpectrumContainer
:rtype: AccumulatorBase
"""
pair_separation_bins = self.create_bins()
self.apply_to_flux_pairs(pairs, pairs_angles, delta_t_file, pair_separation_bins)
return pair_separation_bins