Skip to content

Commit

Permalink
Merge pull request #14 from schuenke/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
schuenke committed Mar 11, 2021
2 parents a5043c2 + d5cca25 commit 7406d0f
Show file tree
Hide file tree
Showing 12 changed files with 396 additions and 77 deletions.
24 changes: 13 additions & 11 deletions bmctool/bmc_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,26 +137,25 @@ def run_parallel(self):
f"not suitable for parallelized computation. \nPlease switch to sequential "
f"computation by changing the 'par_calc' option from 'True' to 'False'.")

# calculate number of event blocks EXCLUDING all m0 events
# calculate number of event blocks EXCLUDING all unsaturated m0 events
n_block_events = len(block_events) - m0_event_count

# get number of blocks per offsets
n_ = n_block_events / self.n_offsets
if not n_.is_integer():
raise Exception(f"Calculated number of block events per offset ({n_block_events}/{self.n_offsets} = {n_}) "
f"is not an integer. Aborting parallel computation.")
else:
n_ = int(n_)
n_: int = int(n_)

# get dict with all block events of 1st offset. This will be applied (w. adjusted freq) to all offsets.
event_table_single_offset = {k: block_events[k] for k in list(block_events)[m0_event_count:m0_event_count + n_]}
# get dict with all block events of 2nd offset. This will be applied (w. adjusted freq) to all offsets.
event_table_single_offset = {k: block_events[k] for k in list(block_events)[m0_event_count+n_:m0_event_count+2*n_]}

# extract the offsets in rad from rf library
events_freq = [self.seq.rf_library.data[k][4] for k in list(self.seq.rf_library.data)]
events_phase = [self.seq.rf_library.data[k][5] for k in list(self.seq.rf_library.data)]

# check if 0 ppm is in the events. As all rf events with freq = 0 ppm have the same phase value of 0,
# independent of the number of pulses per saturation train only one single rf block event appears. For the
# check if 0 ppm is in the events. Because all rf events with freq = 0 ppm have the same phase value of 0
# (independent of the number of pulses per saturation train), only one single rf block event appears. For the
# parallel computation, this event has to be duplicated and inserted into the event block dict until the
# number of entries matches the number of entries at the other offsets.
if 0.0 in events_freq:
Expand All @@ -168,9 +167,10 @@ def run_parallel(self):
'Unexpected number of block events. The current scenario is probably not suitable for '
'the parallel computation in the current form')

idx_zero = events_freq.index(0.0)
events_freq[idx_zero:idx_zero] = [0.0] * (n_rf_per_offset - 1)
events_phase[idx_zero:idx_zero] = [events_freq[idx_zero]] * (n_rf_per_offset - 1)
if n_rf_per_offset > 1:
idx_zero = events_freq.index(0.0)
events_freq[idx_zero:idx_zero] = [0.0] * (n_rf_per_offset - 1)
events_phase[idx_zero:idx_zero] = [events_freq[idx_zero]] * (n_rf_per_offset - 1)

else:
n_rf_per_offset = len(events_freq) / self.n_offsets
Expand Down Expand Up @@ -254,7 +254,9 @@ def run_parallel(self):

elif hasattr(block, 'gx') and hasattr(block, 'gy') and hasattr(block, 'gz'):
dur_ = float(block.gx.rise_time + block.gx.flat_time + block.gx.fall_time)
self.bm_solver.update_matrix(0, 0, 0)
self.bm_solver.update_matrix(rf_amp=0.0,
rf_phase=np.zeros(self.n_offsets),
rf_freq=np.zeros(self.n_offsets))
M_ = self.bm_solver.solve_equation(mag=M_, dtp=dur_)
M_[:, 0:(len(self.params.cest_pools) + 1) * 2] = 0.0 # assume complete spoiling
else:
Expand Down
Empty file.
42 changes: 42 additions & 0 deletions bmctool/utils/pulses/calc_power_equivalents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
calc_power_equivalents.py
"""

import numpy as np
from types import SimpleNamespace


def calc_power_equivalent(rf_pulse: SimpleNamespace,
tp: float,
td: float,
gamma_hz: float = 42.5764)\
-> np.ndarray:
"""
Calculates the continuous wave power equivalent for a given rf pulse.
:param rf_pulse: pypulseq radio-frequency pulse
:param tp: pulse duration [s]
:param td: interpulse delay [s]
:param gamma_hz: gyromagnetic ratio [Hz]
"""
amp = rf_pulse.signal/gamma_hz
duty_cycle = tp / (tp + td)

return np.sqrt(np.trapz(amp**2, rf_pulse.t) / tp * duty_cycle) # continuous wave power equivalent


def calc_amplitude_equivalent(rf_pulse: SimpleNamespace,
tp: float,
td: float,
gamma_hz: float = 42.5764)\
-> np.ndarray:
"""
Calculates the continuous wave amplitude equivalent for a given rf pulse.
:param rf_pulse: pypulseq radio-frequency pulse
:param tp: pulse duration [s]
:param td: interpulse delay [s]
:param gamma_hz: gyromagnetic ratio [Hz]
"""
duty_cycle = tp / (tp + td)
alpha_rad = np.trapz(rf_pulse.signal * gamma_hz * 360, rf_pulse.t) * np.pi / 180

return alpha_rad / (gamma_hz * 2 * np.pi * tp) * duty_cycle # continuous wave amplitude equivalent
31 changes: 31 additions & 0 deletions bmctool/utils/pulses/calculate_phase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
calculate_phase.py
Function to calculate phase modulation for a given frequency modulation.
"""

import numpy as np


def calculate_phase(frequency: np.ndarray,
duration: float,
samples: int,
shift_idx: int = -1,
pos_offsets: bool = False) \
-> np.ndarray:
"""
Calculates phase modulation for a given frequency modulation.
:param frequency: frequency modulation of pulse
:param duration: pulse pulse_duration [s]
:param samples: number of sample points
:param shift_idx: index of entry in frequency used to shift phase (default is last entry -> idx = -1)
:param pos_offsets: flag needed to shift phase in [0 2pi] for offsets > 0
"""
phase = np.zeros_like(frequency)
for i in range(1, samples):
phase[i] = phase[i-1] + (frequency[i] * duration/samples)
phase_shift = phase[shift_idx]
for i in range(samples):
phase[i] = np.fmod(phase[i]+1e-12 - phase_shift, 2 * np.pi)
if not pos_offsets:
phase += 2 * np.pi
return phase
45 changes: 45 additions & 0 deletions bmctool/utils/pulses/create_arbitrary_pulse_with_phase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
create_arbitrary_pulse_with_phase.py
Function to create a radio-frequency pulse event with arbitrary pulse shape and phase modulation.
"""

import numpy as np
from types import SimpleNamespace
from pypulseq.opts import Opts


def create_arbitrary_pulse_with_phase(signal: np.ndarray,
flip_angle: float,
freq_offset: float = 0,
phase_offset: float = 0,
system: Opts = Opts()) \
-> (SimpleNamespace, None):
"""
Creates a radio-frequency pulse event with arbitrary pulse shape and phase modulation
:param signal: signal modulation (amplitude and phase) of pulse
:param flip_angle: flip angle of pulse [rad]
:param freq_offset: frequency offset [Hz]
:param phase_offset: phase offset [rad]
:param system: system limits of the MR scanner
:return:
"""

signal *= (flip_angle / (2 * np.pi))
t = np.linspace(1, len(signal)) * system.rf_raster_time

rf = SimpleNamespace()
rf.type = 'rf'
rf.signal = signal
rf.t = t
rf.freq_offset = freq_offset
rf.phase_offset = phase_offset
rf.dead_time = system.rf_dead_time
rf.ringdown_time = system.rf_ringdown_time
rf.delay = system.rf_dead_time

if rf.ringdown_time > 0:
t_fill = np.arange(1, round(rf.ringdown_time / 1e-6) + 1) * 1e-6
rf.t = np.concatenate((rf.t, rf.t[-1] + t_fill))
rf.signal = np.concatenate((rf.signal, np.zeros(len(t_fill))))

return rf, None
24 changes: 24 additions & 0 deletions bmctool/utils/pulses/make_hanning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import numpy as np
from types import SimpleNamespace
from scipy.signal import hanning
from pypulseq.opts import Opts
from pypulseq.make_gauss_pulse import make_gauss_pulse


def make_gauss_hanning(flip_angle: float,
pulse_duration: float,
system: Opts = Opts())\
-> SimpleNamespace:
"""
Creates a radio-frequency pulse event for a gauss pulse with hanning window.
:param flip_angle: flip_angle of the rf pulse
:param pulse_duration: rf pulse duration [s]
:param system: system limits of the MR scanner
"""

rf_pulse, _, _ = make_gauss_pulse(flip_angle=flip_angle, duration=pulse_duration, system=system)
n_signal = np.sum(np.abs(rf_pulse.signal) > 0)
hanning_shape = hanning(n_signal + 2)
rf_pulse.signal[:n_signal] = hanning_shape[1:-1] / np.trapz(rf_pulse.t[:n_signal], hanning_shape[1:-1]) * \
(flip_angle / (2 * np.pi))
return rf_pulse
193 changes: 193 additions & 0 deletions bmctool/utils/pulses/make_hsexp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""
make_hsexp.py
Function to create all possible HSExp pulses (tip-down/tip-up & pos/neg offset)
"""

import numpy as np
from types import SimpleNamespace
from pypulseq.opts import Opts
from bmctool.utils.pulses.create_arbitrary_pulse_with_phase import create_arbitrary_pulse_with_phase
from bmctool.utils.pulses.make_hypsec_half_passage import calculate_amplitude as hypsec_amp
from bmctool.utils.pulses.calculate_phase import calculate_phase


def calculate_window_modulation(t: np.ndarray,
t0: float) \
-> np.ndarray:
"""
Calculates modulation function for HSExp pulses.
:param t: time points of the different sample points [s]
:param t0: reference time point (= last point for half passage pulse) [s]
:return:
"""
return 0.42 - 0.5 * np.cos(np.pi * t / t0) + 0.08 * np.cos(2 * np.pi * t / t0)


def calculate_frequency(t: np.ndarray,
t0: float,
bandwidth: float,
ef: float,
freq_factor: int) \
-> np.ndarray:
"""
Calculates modulation function for HSExp pulses.
:param t: time points of the different sample points [s]
:param t0: reference time point (= last point for half passage pulse) [s]
:param bandwidth: bandwidth of the pulse [Hz]
:param ef: dimensionless parameter to control steepness of the exponential curve
:param freq_factor: factor (-1 or +1) to switch between positive and negative offsets
"""

return -freq_factor * bandwidth * np.pi * np.exp(-t / t0 * ef)


def make_hsexp(amp: float = 1.0,
t_p: float = 12e-3,
mu: float = 65,
bandwidth: float = 2500,
t_window: float = 3.5e-3,
ef: float = 3.5,
tip_down: bool = True,
pos_offset: bool = True,
system: Opts = Opts(),
gamma_hz: float = 42.5764) \
-> SimpleNamespace:
"""
Creates a radio-frequency pulse event with amplitude and phase modulation of a HSExp pulse.
:param amp: maximum amplitude value [µT]
:param t_p: pulse pulse_duration [s]
:param mu: parameter µ of hyperbolic secant pulse
:param bandwidth: bandwidth of hyperbolic secant pulse [Hz]
:param t_window: pulse_duration of window function
:param ef: dimensionless parameter to control steepness of the exponential curve
:param system: system limits of the MR scanner
:param gamma_hz: gyromagnetic ratio [Hz]
"""

samples = int(t_p * 1e6)
t_pulse = np.divide(np.arange(1, samples + 1), samples) * t_p # time point array

# find start index of window function
idx_window = np.argmin(np.abs(t_pulse - t_window))

if tip_down:
shift_idx = -1
else:
shift_idx = 0

# calculate amplitude of hyperbolic secant (HS) pulse
w1 = hypsec_amp(t_pulse, t_pulse[shift_idx], amp, mu, bandwidth)

# calculate and apply modulation function to convert HS into HSExp pulse
window_mod = calculate_window_modulation(t_pulse[:idx_window], t_pulse[idx_window])
if tip_down:
w1[:idx_window] = w1[:idx_window] * window_mod
else:
w1[-idx_window:] = w1[-idx_window:] * np.flip(window_mod)

# calculate freq modulation of pulse
if tip_down and pos_offset:
dfreq = calculate_frequency(t_pulse, t_pulse[-1], bandwidth, ef, 1)
elif tip_down and not pos_offset:
dfreq = calculate_frequency(t_pulse, t_pulse[-1], bandwidth, ef, -1)
elif not tip_down and pos_offset:
dfreq = calculate_frequency(np.flip(t_pulse), t_pulse[-1], bandwidth, ef, 1)
elif not tip_down and not pos_offset:
dfreq = calculate_frequency(np.flip(t_pulse), t_pulse[-1], bandwidth, ef, -1)

# make freq modulation end (in case of tip-down) or start (in case of tip-up) with dw = 0
diff_idx = np.argmin(np.abs(dfreq))
dfreq -= dfreq[diff_idx]

# calculate phase (= integrate over dfreq)
dphase = calculate_phase(dfreq, t_p, samples, shift_idx=shift_idx, pos_offsets=pos_offset)

# create pypulseq rf pulse object
signal = w1 * np.exp(1j * dphase) # create complex array with amp and phase
flip_angle = gamma_hz * 2 * np.pi
hsexp, _ = create_arbitrary_pulse_with_phase(signal=signal, flip_angle=flip_angle, system=system)

return hsexp


def generate_hsexp_dict(amp: float = 1.0,
t_p: float = 12e-3,
mu: float = 65,
bandwidth: float = 2500,
t_window: float = 3.5e-3,
ef: float = 3.5,
system: Opts = Opts(),
gamma_hz: float = 42.5764) \
-> dict:
"""
Creates a dictionary with the 4 different hsexp pulses (tip-down/up and pos/neg offsets)
:param amp: maximum amplitude value [µT]
:param t_p: pulse pulse_duration [s]
:param mu: parameter µ of hyperbolic secant pulse
:param bandwidth: bandwidth of hyperbolic secant pulse [Hz]
:param t_window: pulse_duration of window function
:param ef: dimensionless parameter to control steepness of the exponential curve
:param system: system limits of the MR scanner
:param gamma_hz: gyromagnetic ratio [Hz]
:return:
"""

pulse_dict = {} # create empty dict for the 4 different pulses

# tip-down positive offset
pre_pos = make_hsexp(amp=amp,
t_p=t_p,
mu=mu,
bandwidth=bandwidth,
t_window=t_window,
ef=ef,
tip_down=True,
pos_offset=True,
system=system,
gamma_hz=gamma_hz)

pulse_dict.update({'pre_pos': pre_pos})

# tip-down negative offset
pre_neg = make_hsexp(amp=amp,
t_p=t_p,
mu=mu,
bandwidth=bandwidth,
t_window=t_window,
ef=ef,
tip_down=True,
pos_offset=False,
system=system,
gamma_hz=gamma_hz)

pulse_dict.update({'pre_neg': pre_neg})

# tip-up positive offsets
post_pos = make_hsexp(amp=amp,
t_p=t_p,
mu=mu,
bandwidth=bandwidth,
t_window=t_window,
ef=ef,
tip_down=False,
pos_offset=True,
system=system,
gamma_hz=gamma_hz)

pulse_dict.update({'post_pos': post_pos})

# tip-up negative offsets
post_neg = make_hsexp(amp=amp,
t_p=t_p,
mu=mu,
bandwidth=bandwidth,
t_window=t_window,
ef=ef,
tip_down=False,
pos_offset=False,
system=system,
gamma_hz=gamma_hz)

pulse_dict.update({'post_neg': post_neg})

return pulse_dict
Loading

0 comments on commit 7406d0f

Please sign in to comment.