-
Notifications
You must be signed in to change notification settings - Fork 7
/
TDDR.py
92 lines (72 loc) · 2.94 KB
/
TDDR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import numpy as np
from scipy.signal import butter, filtfilt
def TDDR(signal, sample_rate):
# This function is the reference implementation for the TDDR algorithm for
# motion correction of fNIRS data, as described in:
#
# Fishburn F.A., Ludlum R.S., Vaidya C.J., & Medvedev A.V. (2019).
# Temporal Derivative Distribution Repair (TDDR): A motion correction
# method for fNIRS. NeuroImage, 184, 171-179.
# https://doi.org/10.1016/j.neuroimage.2018.09.025
#
# Usage:
# signals_corrected = TDDR( signals , sample_rate );
#
# Inputs:
# signals: A [sample x channel] matrix of uncorrected optical density data
# sample_rate: A scalar reflecting the rate of acquisition in Hz
#
# Outputs:
# signals_corrected: A [sample x channel] matrix of corrected optical density data
signal = np.array(signal)
if len(signal.shape) != 1:
for ch in range(signal.shape[1]):
signal[:, ch] = TDDR(signal[:, ch], sample_rate)
return signal
# Preprocess: Separate high and low frequencies
filter_cutoff = .5
filter_order = 3
Fc = filter_cutoff * 2/sample_rate
signal_mean = np.mean(signal)
signal -= signal_mean
if Fc < 1:
fb, fa = butter(filter_order, Fc)
signal_low = filtfilt(fb, fa, signal, padlen=0)
else:
signal_low = signal
signal_high = signal - signal_low
# Initialize
tune = 4.685
D = np.sqrt(np.finfo(signal.dtype).eps)
mu = np.inf
iter = 0
# Step 1. Compute temporal derivative of the signal
deriv = np.diff(signal_low)
# Step 2. Initialize observation weights
w = np.ones(deriv.shape)
# Step 3. Iterative estimation of robust weights
while iter < 50:
iter = iter + 1
mu0 = mu
# Step 3a. Estimate weighted mean
mu = np.sum(w * deriv) / np.sum(w)
# Step 3b. Calculate absolute residuals of estimate
dev = np.abs(deriv - mu)
# Step 3c. Robust estimate of standard deviation of the residuals
sigma = 1.4826 * np.median(dev)
# Step 3d. Scale deviations by standard deviation and tuning parameter
r = dev / (sigma * tune)
# Step 3e. Calculate new weights according to Tukey's biweight function
w = ((1 - r**2) * (r < 1)) ** 2
# Step 3f. Terminate if new estimate is within machine-precision of old estimate
if abs(mu - mu0) < D * max(abs(mu), abs(mu0)):
break
# Step 4. Apply robust weights to centered derivative
new_deriv = w * (deriv - mu)
# Step 5. Integrate corrected derivative
signal_low_corrected = np.cumsum(np.insert(new_deriv, 0, 0.0))
# Postprocess: Center the corrected signal
signal_low_corrected = signal_low_corrected - np.mean(signal_low_corrected)
# Postprocess: Merge back with uncorrected high frequency component
signal_corrected = signal_low_corrected + signal_high + signal_mean
return signal_corrected