Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes and features for dealing with realistic data. #777

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/toast/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ install(FILES
schedule_sim_ground.py
schedule_sim_satellite.py
widgets.py
hwp_utils.py
"RELEASE"
DESTINATION ${PYTHON_SITE}/toast
)
Expand Down
279 changes: 279 additions & 0 deletions src/toast/hwp_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
# Copyright (c) 2023-2024 by the parties listed in the AUTHORS file.
# All rights reserved. Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.

import numpy as np
from scipy.linalg import eigvalsh, lu_factor, lu_solve

from .dist import distribute_uniform
from .mpi import MPI


def hwpss_samples(n_samp, comm):
"""Helper function to distribute samples.

This distributes slices of samples uniformly among the processes
of an MPI communicator.

Args:
n_samp (int): The number of samples
comm (MPI.Comm): The communicator.

Returns:
(slice): The local slice on this process.

"""
if comm is None:
slc = slice(0, n_samp, 1)
return slc
dist = distribute_uniform(n_samp, comm.size)
# The sample counts and displacement for each process
sample_count = [x.n_elem for x in dist]
sample_displ = [x.offset for x in dist]

# The local sample range as a slice
slc = slice(
sample_displ[comm.rank],
sample_displ[comm.rank] + sample_count[comm.rank],
1,
)
return slc


def hwpss_sincos_buffer(angles, flags, n_harmonics, comm=None):
"""Precompute sin / cos terms.

Compute the harmonic terms involving sin / cos of the HWP angle.
This is done once in parallel and used by each process for its
local detectors.

Args:
angles (array): The HWP angle at each time stamp
flags (array): The flags indicating bad angle samples.
n_harmonics (ing): The number of harmonics in the model.
comm (MPI.Comm): The optional communicator to parallelize
the calculation.

Returns:
(array): The full buffer of sin/cos factors on all processes.

"""
slc = hwpss_samples(len(angles), comm)
ang = np.copy(angles[slc])
good = flags[slc] == 0
my_samp = len(ang)
sample_vals = 2 * n_harmonics
my_sincos = np.zeros((my_samp, sample_vals), dtype=np.float32)
for h in range(n_harmonics):
my_sincos[good, 2 * h] = np.sin((h + 1) * ang[good])
my_sincos[good, 2 * h + 1] = np.cos((h + 1) * ang[good])
if comm is None:
return my_sincos
sincos = np.vstack(comm.allgather(my_sincos))
return sincos


def hwpss_compute_coeff_covariance(times, flags, sincos, comm=None):
"""Build covariance of HWPSS model coefficients.

The HWPSS model for this function is the one used by the Maxipol
and EBEX experiments. See for example Joy Didier's thesis, equation
8.17: https://academiccommons.columbia.edu/doi/10.7916/D8MW2HCG

The model with N harmonics and HWP angle H can be written as:

h(t) = Sum(i=1...N) { [ C_i0 + C_i1 * t ] cos(i * H(t)) +
[ C_i2 + C_i3 * t ] sin(i * H(t)) ] }

Writing this in terms of the vector of coefficients and the matrix of
of sin / cos factors:

h(t) = M x C
h(t) =
[ cos(H(t_0)) t_0 cos(H(t_0)) sin(H(t_0)) t_0 sin(H(t_0)) cos(2H(t_0)) ...]
[ cos(H(t_1)) t_1 cos(H(t_1)) sin(H(t_1)) t_1 sin(H(t_1)) cos(2H(t_1)) ...]
[ ... ] X [ C_10 C_11 C_12 C_13 C_20 C_21 C_22 C_23 C_30 ... ]^T

The least squares solution for the coefficients is then

C = (M^T M)^-1 M^T h(t)

We then assume that h(t) is just the input data and that it is dominated by the
HWPSS. This function computes the covariance matrix and factors it for later
use.

Args:
times (array): The **relative** timestamps of the samples from the start
of the observation.
flags (array): The flags indicating bad angle samples
sincos (array): The pre-computed sin / cos terms.
comm (MPI.Comm): The optional communicator to parallelize
the calculation.

Returns:
(tuple): The LU factorization and pivots.

"""
slc = hwpss_samples(len(times), comm)
my_sincos = sincos[slc, :]
my_times = times[slc]
tskisner marked this conversation as resolved.
Show resolved Hide resolved
my_flags = flags[slc]

n_harmonics = sincos.shape[1] // 2
my_cov = np.zeros((4 * n_harmonics, 4 * n_harmonics), dtype=np.float64)
good = my_flags == 0
# Compute local upper triangle
for hr in range(0, n_harmonics):
for hc in range(hr, n_harmonics):
my_cov[4 * hr + 0, 4 * hc + 0] = np.dot(
my_sincos[good, 2 * hr + 0], my_sincos[good, 2 * hc + 0]
)
my_cov[4 * hr + 0, 4 * hc + 1] = np.dot(
my_sincos[good, 2 * hr + 0],
np.multiply(my_times[good], my_sincos[good, 2 * hc + 0]),
tskisner marked this conversation as resolved.
Show resolved Hide resolved
)
my_cov[4 * hr + 0, 4 * hc + 2] = np.dot(
my_sincos[good, 2 * hr + 0], my_sincos[good, 2 * hc + 1]
)
my_cov[4 * hr + 0, 4 * hc + 3] = np.dot(
my_sincos[good, 2 * hr + 0],
np.multiply(my_times[good], my_sincos[good, 2 * hc + 1]),
)

my_cov[4 * hr + 1, 4 * hc + 0] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 0]),
my_sincos[good, 2 * hc + 0],
)
my_cov[4 * hr + 1, 4 * hc + 1] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 0]),
np.multiply(my_times[good], my_sincos[good, 2 * hc + 0]),
)
my_cov[4 * hr + 1, 4 * hc + 2] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 0]),
my_sincos[good, 2 * hc + 1],
)
my_cov[4 * hr + 1, 4 * hc + 3] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 0]),
np.multiply(my_times[good], my_sincos[good, 2 * hc + 1]),
)

my_cov[4 * hr + 2, 4 * hc + 0] = np.dot(
my_sincos[good, 2 * hr + 1], my_sincos[good, 2 * hc + 0]
)
my_cov[4 * hr + 2, 4 * hc + 1] = np.dot(
my_sincos[good, 2 * hr + 1],
np.multiply(my_times[good], my_sincos[good, 2 * hc + 0]),
)
my_cov[4 * hr + 2, 4 * hc + 2] = np.dot(
my_sincos[good, 2 * hr + 1], my_sincos[good, 2 * hc + 1]
)
my_cov[4 * hr + 2, 4 * hc + 3] = np.dot(
my_sincos[good, 2 * hr + 1],
np.multiply(my_times[good], my_sincos[good, 2 * hc + 1]),
)

my_cov[4 * hr + 3, 4 * hc + 0] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 1]),
my_sincos[good, 2 * hc + 0],
)
my_cov[4 * hr + 3, 4 * hc + 1] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 1]),
np.multiply(my_times[good], my_sincos[good, 2 * hc + 0]),
)
my_cov[4 * hr + 3, 4 * hc + 2] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 1]),
my_sincos[good, 2 * hc + 1],
)
my_cov[4 * hr + 3, 4 * hc + 3] = np.dot(
np.multiply(my_times[good], my_sincos[good, 2 * hr + 1]),
np.multiply(my_times[good], my_sincos[good, 2 * hc + 1]),
)
# Accumulate across processes
if comm is None:
cov = my_cov
else:
cov = np.zeros((4 * n_harmonics, 4 * n_harmonics), dtype=np.float64)
comm.Allreduce(my_cov, cov, op=MPI.SUM)

# Fill in lower triangle
for hr in range(0, 4 * n_harmonics):
for hc in range(0, hr):
cov[hr, hc] = cov[hc, hr]
# Check that condition number is reasonable
evals = eigvalsh(cov)
rcond = np.min(evals) / np.max(evals)
if rcond < 1.0e-8:
return None
# LU factorization for later solve
cov_lu, cov_piv = lu_factor(cov)
return cov_lu, cov_piv


def hwpss_compute_coeff(detdata, flags, times, sincos, cov_lu, cov_piv):
"""Compute the HWPSS model coefficients.

See docstring for `hwpss_compute_coeff_covariance`. This function computes
the expression M^T h(t) and then uses the LU factorization of the covariance
to solve for the coefficients.

Args:
detdata (array): The detector data for one detector.
flags (array): The detector flags.
times (array): The **relative** timestamps of the samples from the start
of the observation.
sincos (array): The pre-computed sin / cos terms.
cov_lu (array): The covariance LU factorization.
cov_piv (array): The covariance pivots.

Returns:
(array): The coefficients.

"""
n_samp = len(times)
n_harmonics = sincos.shape[1] // 2
good = flags == 0
bad = np.logical_not(good)
input = np.copy(detdata)
dc = np.mean(input[good])
input[:] -= dc
input[bad] = 0
rhs = np.zeros(4 * n_harmonics, dtype=np.float64)
for h in range(n_harmonics):
rhs[4 * h + 0] = np.dot(input[good], sincos[good, 2 * h])
rhs[4 * h + 1] = np.dot(
input[good], np.multiply(sincos[good, 2 * h], times[good])
)
rhs[4 * h + 2] = np.dot(input[good], sincos[good, 2 * h + 1])
rhs[4 * h + 3] = np.dot(
input[good], np.multiply(sincos[good, 2 * h + 1], times[good])
)
coeff = lu_solve((cov_lu, cov_piv), rhs)
return coeff


def hwpss_build_model(times, flags, sincos, coeff):
"""Construct the HWPSS template from coefficients.

Args:
times (array): The **relative** timestamps of the samples from the start
of the observation.
flags (array): The flags indicating bad angle samples
sincos (array): The pre-computed sin / cos terms.
coeff (array): The model coefficents for this detector.

Returns:
(array): The template.

"""
n_samp = len(times)
n_harmonics = sincos.shape[1] // 2
good = flags == 0
model = np.zeros(n_samp, dtype=np.float64)
for h in range(n_harmonics):
model[good] += coeff[4 * h + 0] * sincos[good, 2 * h]
model[good] += coeff[4 * h + 1] * np.multiply(sincos[good, 2 * h], times[good])
model[good] += coeff[4 * h + 2] * sincos[good, 2 * h + 1]
model[good] += coeff[4 * h + 3] * np.multiply(
sincos[good, 2 * h + 1], times[good]
)
return model
1 change: 1 addition & 0 deletions src/toast/ops/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ install(FILES
weather_model.py
yield_cut.py
azimuth_intervals.py
calibrate.py
DESTINATION ${PYTHON_SITE}/toast/ops
)

Expand Down
1 change: 1 addition & 0 deletions src/toast/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .arithmetic import Combine
from .azimuth_intervals import AzimuthIntervals
from .cadence_map import CadenceMap
from .calibrate import CalibrateDetectors
from .common_mode_noise import CommonModeNoise
from .conviqt import SimConviqt, SimTEBConviqt, SimWeightedConviqt
from .copy import Copy
Expand Down
44 changes: 41 additions & 3 deletions src/toast/ops/azimuth_intervals.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ class AzimuthIntervals(Operator):

cut_short = Bool(True, help="If True, remove very short scanning intervals")

cut_long = Bool(True, help="If True, remove very long scanning intervals")

short_limit = Quantity(
0.25 * u.dimensionless_unscaled,
help="Minimum length of a scan. Either the minimum length in time or a "
"fraction of median scan length",
)

long_limit = Quantity(
1.25 * u.dimensionless_unscaled,
help="Maximum length of a scan. Either the maximum length in time or a "
"fraction of median scan length",
)

scanning_interval = Unicode(
defaults.scanning_interval, help="Interval name for scanning"
)
Expand Down Expand Up @@ -172,6 +180,15 @@ def _exec(self, data, detectors=None, **kwargs):

begin_stable = np.where(stable[1:] - stable[:-1] == 1)[0]
end_stable = np.where(stable[:-1] - stable[1:] == 1)[0]

if len(begin_stable) == 0 or len(end_stable) == 0:
msg = f"Observation {obs.name} has no stable scanning"
msg += f" periods. You should cut this observation or"
msg += f" change the filter window. Flagging all samples"
msg += f" as unstable pointing."
log.warning(msg)
continue

if begin_stable[0] > end_stable[0]:
# We start in the middle of a scan
begin_stable = np.concatenate(([0], begin_stable))
Expand Down Expand Up @@ -204,6 +221,24 @@ def _exec(self, data, detectors=None, **kwargs):
stable_times = [
x for (x, y) in zip(stable_times, stable_bad) if not y
]
if self.cut_long:
stable_spans = np.array([(x[1] - x[0]) for x in stable_times])
try:
# First try long limit as time
stable_bad = stable_spans > self.long_limit.to_value(u.s)
except:
# Try long limit as fraction
median_stable = np.median(stable_spans)
stable_bad = stable_spans > self.long_limit * median_stable
begin_stable = np.array(
[x for (x, y) in zip(begin_stable, stable_bad) if not y]
)
end_stable = np.array(
[x for (x, y) in zip(end_stable, stable_bad) if not y]
)
stable_times = [
x for (x, y) in zip(stable_times, stable_bad) if not y
]

# The "throw" intervals extend from one turnaround to the next.
# We start the first throw at the beginning of the first stable scan
Expand All @@ -220,9 +255,12 @@ def _exec(self, data, detectors=None, **kwargs):
< 0
)[0]
if len(vel_switch) > 1:
msg = "Multiple turnarounds between end of stable scan at"
msg = f" sample {start_turn} and next start at {end_turn}"
raise RuntimeError(msg)
msg = f"{obs.name}: Multiple turnarounds between end of "
msg += "stable scan at"
msg += f" sample {start_turn} and next start at {end_turn}."
msg += " Cutting ."
log.warning(msg)
break
end_throw.append(start_turn + vel_switch[0])
begin_throw.append(end_throw[-1] + 1)
end_throw.append(end_stable[-1])
Expand Down
Loading
Loading