diff --git a/bmctool/bmc_tool.py b/bmctool/bmc_tool.py index d74251b..c1ee7bf 100644 --- a/bmctool/bmc_tool.py +++ b/bmctool/bmc_tool.py @@ -137,7 +137,7 @@ def run_parallel(self): f"not suitable for parallelized computation. \nPlease switch to sequential " f"computation by changing the 'par_calc' option from 'True' to 'False'.") - # calculate number of event blocks EXCLUDING all m0 events + # calculate number of event blocks EXCLUDING all unsaturated m0 events n_block_events = len(block_events) - m0_event_count # get number of blocks per offsets @@ -145,18 +145,17 @@ def run_parallel(self): if not n_.is_integer(): raise Exception(f"Calculated number of block events per offset ({n_block_events}/{self.n_offsets} = {n_}) " f"is not an integer. Aborting parallel computation.") - else: - n_ = int(n_) + n_: int = int(n_) - # get dict with all block events of 1st offset. This will be applied (w. adjusted freq) to all offsets. - event_table_single_offset = {k: block_events[k] for k in list(block_events)[m0_event_count:m0_event_count + n_]} + # get dict with all block events of 2nd offset. This will be applied (w. adjusted freq) to all offsets. + event_table_single_offset = {k: block_events[k] for k in list(block_events)[m0_event_count+n_:m0_event_count+2*n_]} # extract the offsets in rad from rf library events_freq = [self.seq.rf_library.data[k][4] for k in list(self.seq.rf_library.data)] events_phase = [self.seq.rf_library.data[k][5] for k in list(self.seq.rf_library.data)] - # check if 0 ppm is in the events. As all rf events with freq = 0 ppm have the same phase value of 0, - # independent of the number of pulses per saturation train only one single rf block event appears. For the + # check if 0 ppm is in the events. Because all rf events with freq = 0 ppm have the same phase value of 0 + # (independent of the number of pulses per saturation train), only one single rf block event appears. For the # parallel computation, this event has to be duplicated and inserted into the event block dict until the # number of entries matches the number of entries at the other offsets. if 0.0 in events_freq: @@ -168,9 +167,10 @@ def run_parallel(self): 'Unexpected number of block events. The current scenario is probably not suitable for ' 'the parallel computation in the current form') - idx_zero = events_freq.index(0.0) - events_freq[idx_zero:idx_zero] = [0.0] * (n_rf_per_offset - 1) - events_phase[idx_zero:idx_zero] = [events_freq[idx_zero]] * (n_rf_per_offset - 1) + if n_rf_per_offset > 1: + idx_zero = events_freq.index(0.0) + events_freq[idx_zero:idx_zero] = [0.0] * (n_rf_per_offset - 1) + events_phase[idx_zero:idx_zero] = [events_freq[idx_zero]] * (n_rf_per_offset - 1) else: n_rf_per_offset = len(events_freq) / self.n_offsets @@ -254,7 +254,9 @@ def run_parallel(self): elif hasattr(block, 'gx') and hasattr(block, 'gy') and hasattr(block, 'gz'): dur_ = float(block.gx.rise_time + block.gx.flat_time + block.gx.fall_time) - self.bm_solver.update_matrix(0, 0, 0) + self.bm_solver.update_matrix(rf_amp=0.0, + rf_phase=np.zeros(self.n_offsets), + rf_freq=np.zeros(self.n_offsets)) M_ = self.bm_solver.solve_equation(mag=M_, dtp=dur_) M_[:, 0:(len(self.params.cest_pools) + 1) * 2] = 0.0 # assume complete spoiling else: diff --git a/bmctool/utils/pulses/__init__.py b/bmctool/utils/pulses/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bmctool/utils/pulses/calc_power_equivalents.py b/bmctool/utils/pulses/calc_power_equivalents.py new file mode 100644 index 0000000..981c4d0 --- /dev/null +++ b/bmctool/utils/pulses/calc_power_equivalents.py @@ -0,0 +1,42 @@ +""" + calc_power_equivalents.py +""" + +import numpy as np +from types import SimpleNamespace + + +def calc_power_equivalent(rf_pulse: SimpleNamespace, + tp: float, + td: float, + gamma_hz: float = 42.5764)\ + -> np.ndarray: + """ + Calculates the continuous wave power equivalent for a given rf pulse. + :param rf_pulse: pypulseq radio-frequency pulse + :param tp: pulse duration [s] + :param td: interpulse delay [s] + :param gamma_hz: gyromagnetic ratio [Hz] + """ + amp = rf_pulse.signal/gamma_hz + duty_cycle = tp / (tp + td) + + return np.sqrt(np.trapz(amp**2, rf_pulse.t) / tp * duty_cycle) # continuous wave power equivalent + + +def calc_amplitude_equivalent(rf_pulse: SimpleNamespace, + tp: float, + td: float, + gamma_hz: float = 42.5764)\ + -> np.ndarray: + """ + Calculates the continuous wave amplitude equivalent for a given rf pulse. + :param rf_pulse: pypulseq radio-frequency pulse + :param tp: pulse duration [s] + :param td: interpulse delay [s] + :param gamma_hz: gyromagnetic ratio [Hz] + """ + duty_cycle = tp / (tp + td) + alpha_rad = np.trapz(rf_pulse.signal * gamma_hz * 360, rf_pulse.t) * np.pi / 180 + + return alpha_rad / (gamma_hz * 2 * np.pi * tp) * duty_cycle # continuous wave amplitude equivalent diff --git a/bmctool/utils/pulses/calculate_phase.py b/bmctool/utils/pulses/calculate_phase.py new file mode 100644 index 0000000..b2269ff --- /dev/null +++ b/bmctool/utils/pulses/calculate_phase.py @@ -0,0 +1,31 @@ +""" +calculate_phase.py + Function to calculate phase modulation for a given frequency modulation. +""" + +import numpy as np + + +def calculate_phase(frequency: np.ndarray, + duration: float, + samples: int, + shift_idx: int = -1, + pos_offsets: bool = False) \ + -> np.ndarray: + """ + Calculates phase modulation for a given frequency modulation. + :param frequency: frequency modulation of pulse + :param duration: pulse pulse_duration [s] + :param samples: number of sample points + :param shift_idx: index of entry in frequency used to shift phase (default is last entry -> idx = -1) + :param pos_offsets: flag needed to shift phase in [0 2pi] for offsets > 0 + """ + phase = np.zeros_like(frequency) + for i in range(1, samples): + phase[i] = phase[i-1] + (frequency[i] * duration/samples) + phase_shift = phase[shift_idx] + for i in range(samples): + phase[i] = np.fmod(phase[i]+1e-12 - phase_shift, 2 * np.pi) + if not pos_offsets: + phase += 2 * np.pi + return phase diff --git a/bmctool/utils/pulses/create_arbitrary_pulse_with_phase.py b/bmctool/utils/pulses/create_arbitrary_pulse_with_phase.py new file mode 100644 index 0000000..32f375f --- /dev/null +++ b/bmctool/utils/pulses/create_arbitrary_pulse_with_phase.py @@ -0,0 +1,45 @@ +""" +create_arbitrary_pulse_with_phase.py + Function to create a radio-frequency pulse event with arbitrary pulse shape and phase modulation. +""" + +import numpy as np +from types import SimpleNamespace +from pypulseq.opts import Opts + + +def create_arbitrary_pulse_with_phase(signal: np.ndarray, + flip_angle: float, + freq_offset: float = 0, + phase_offset: float = 0, + system: Opts = Opts()) \ + -> (SimpleNamespace, None): + """ + Creates a radio-frequency pulse event with arbitrary pulse shape and phase modulation + :param signal: signal modulation (amplitude and phase) of pulse + :param flip_angle: flip angle of pulse [rad] + :param freq_offset: frequency offset [Hz] + :param phase_offset: phase offset [rad] + :param system: system limits of the MR scanner + :return: + """ + + signal *= (flip_angle / (2 * np.pi)) + t = np.linspace(1, len(signal)) * system.rf_raster_time + + rf = SimpleNamespace() + rf.type = 'rf' + rf.signal = signal + rf.t = t + rf.freq_offset = freq_offset + rf.phase_offset = phase_offset + rf.dead_time = system.rf_dead_time + rf.ringdown_time = system.rf_ringdown_time + rf.delay = system.rf_dead_time + + if rf.ringdown_time > 0: + t_fill = np.arange(1, round(rf.ringdown_time / 1e-6) + 1) * 1e-6 + rf.t = np.concatenate((rf.t, rf.t[-1] + t_fill)) + rf.signal = np.concatenate((rf.signal, np.zeros(len(t_fill)))) + + return rf, None diff --git a/bmctool/utils/pulses/make_hanning.py b/bmctool/utils/pulses/make_hanning.py new file mode 100644 index 0000000..063668b --- /dev/null +++ b/bmctool/utils/pulses/make_hanning.py @@ -0,0 +1,24 @@ +import numpy as np +from types import SimpleNamespace +from scipy.signal import hanning +from pypulseq.opts import Opts +from pypulseq.make_gauss_pulse import make_gauss_pulse + + +def make_gauss_hanning(flip_angle: float, + pulse_duration: float, + system: Opts = Opts())\ + -> SimpleNamespace: + """ + Creates a radio-frequency pulse event for a gauss pulse with hanning window. + :param flip_angle: flip_angle of the rf pulse + :param pulse_duration: rf pulse duration [s] + :param system: system limits of the MR scanner + """ + + rf_pulse, _, _ = make_gauss_pulse(flip_angle=flip_angle, duration=pulse_duration, system=system) + n_signal = np.sum(np.abs(rf_pulse.signal) > 0) + hanning_shape = hanning(n_signal + 2) + rf_pulse.signal[:n_signal] = hanning_shape[1:-1] / np.trapz(rf_pulse.t[:n_signal], hanning_shape[1:-1]) * \ + (flip_angle / (2 * np.pi)) + return rf_pulse diff --git a/bmctool/utils/pulses/make_hsexp.py b/bmctool/utils/pulses/make_hsexp.py new file mode 100644 index 0000000..e1045a6 --- /dev/null +++ b/bmctool/utils/pulses/make_hsexp.py @@ -0,0 +1,193 @@ +""" +make_hsexp.py + Function to create all possible HSExp pulses (tip-down/tip-up & pos/neg offset) +""" + +import numpy as np +from types import SimpleNamespace +from pypulseq.opts import Opts +from bmctool.utils.pulses.create_arbitrary_pulse_with_phase import create_arbitrary_pulse_with_phase +from bmctool.utils.pulses.make_hypsec_half_passage import calculate_amplitude as hypsec_amp +from bmctool.utils.pulses.calculate_phase import calculate_phase + + +def calculate_window_modulation(t: np.ndarray, + t0: float) \ + -> np.ndarray: + """ + Calculates modulation function for HSExp pulses. + :param t: time points of the different sample points [s] + :param t0: reference time point (= last point for half passage pulse) [s] + :return: + """ + return 0.42 - 0.5 * np.cos(np.pi * t / t0) + 0.08 * np.cos(2 * np.pi * t / t0) + + +def calculate_frequency(t: np.ndarray, + t0: float, + bandwidth: float, + ef: float, + freq_factor: int) \ + -> np.ndarray: + """ + Calculates modulation function for HSExp pulses. + :param t: time points of the different sample points [s] + :param t0: reference time point (= last point for half passage pulse) [s] + :param bandwidth: bandwidth of the pulse [Hz] + :param ef: dimensionless parameter to control steepness of the exponential curve + :param freq_factor: factor (-1 or +1) to switch between positive and negative offsets + """ + + return -freq_factor * bandwidth * np.pi * np.exp(-t / t0 * ef) + + +def make_hsexp(amp: float = 1.0, + t_p: float = 12e-3, + mu: float = 65, + bandwidth: float = 2500, + t_window: float = 3.5e-3, + ef: float = 3.5, + tip_down: bool = True, + pos_offset: bool = True, + system: Opts = Opts(), + gamma_hz: float = 42.5764) \ + -> SimpleNamespace: + """ + Creates a radio-frequency pulse event with amplitude and phase modulation of a HSExp pulse. + :param amp: maximum amplitude value [µT] + :param t_p: pulse pulse_duration [s] + :param mu: parameter µ of hyperbolic secant pulse + :param bandwidth: bandwidth of hyperbolic secant pulse [Hz] + :param t_window: pulse_duration of window function + :param ef: dimensionless parameter to control steepness of the exponential curve + :param system: system limits of the MR scanner + :param gamma_hz: gyromagnetic ratio [Hz] + """ + + samples = int(t_p * 1e6) + t_pulse = np.divide(np.arange(1, samples + 1), samples) * t_p # time point array + + # find start index of window function + idx_window = np.argmin(np.abs(t_pulse - t_window)) + + if tip_down: + shift_idx = -1 + else: + shift_idx = 0 + + # calculate amplitude of hyperbolic secant (HS) pulse + w1 = hypsec_amp(t_pulse, t_pulse[shift_idx], amp, mu, bandwidth) + + # calculate and apply modulation function to convert HS into HSExp pulse + window_mod = calculate_window_modulation(t_pulse[:idx_window], t_pulse[idx_window]) + if tip_down: + w1[:idx_window] = w1[:idx_window] * window_mod + else: + w1[-idx_window:] = w1[-idx_window:] * np.flip(window_mod) + + # calculate freq modulation of pulse + if tip_down and pos_offset: + dfreq = calculate_frequency(t_pulse, t_pulse[-1], bandwidth, ef, 1) + elif tip_down and not pos_offset: + dfreq = calculate_frequency(t_pulse, t_pulse[-1], bandwidth, ef, -1) + elif not tip_down and pos_offset: + dfreq = calculate_frequency(np.flip(t_pulse), t_pulse[-1], bandwidth, ef, 1) + elif not tip_down and not pos_offset: + dfreq = calculate_frequency(np.flip(t_pulse), t_pulse[-1], bandwidth, ef, -1) + + # make freq modulation end (in case of tip-down) or start (in case of tip-up) with dw = 0 + diff_idx = np.argmin(np.abs(dfreq)) + dfreq -= dfreq[diff_idx] + + # calculate phase (= integrate over dfreq) + dphase = calculate_phase(dfreq, t_p, samples, shift_idx=shift_idx, pos_offsets=pos_offset) + + # create pypulseq rf pulse object + signal = w1 * np.exp(1j * dphase) # create complex array with amp and phase + flip_angle = gamma_hz * 2 * np.pi + hsexp, _ = create_arbitrary_pulse_with_phase(signal=signal, flip_angle=flip_angle, system=system) + + return hsexp + + +def generate_hsexp_dict(amp: float = 1.0, + t_p: float = 12e-3, + mu: float = 65, + bandwidth: float = 2500, + t_window: float = 3.5e-3, + ef: float = 3.5, + system: Opts = Opts(), + gamma_hz: float = 42.5764) \ + -> dict: + """ + Creates a dictionary with the 4 different hsexp pulses (tip-down/up and pos/neg offsets) + :param amp: maximum amplitude value [µT] + :param t_p: pulse pulse_duration [s] + :param mu: parameter µ of hyperbolic secant pulse + :param bandwidth: bandwidth of hyperbolic secant pulse [Hz] + :param t_window: pulse_duration of window function + :param ef: dimensionless parameter to control steepness of the exponential curve + :param system: system limits of the MR scanner + :param gamma_hz: gyromagnetic ratio [Hz] + :return: + """ + + pulse_dict = {} # create empty dict for the 4 different pulses + + # tip-down positive offset + pre_pos = make_hsexp(amp=amp, + t_p=t_p, + mu=mu, + bandwidth=bandwidth, + t_window=t_window, + ef=ef, + tip_down=True, + pos_offset=True, + system=system, + gamma_hz=gamma_hz) + + pulse_dict.update({'pre_pos': pre_pos}) + + # tip-down negative offset + pre_neg = make_hsexp(amp=amp, + t_p=t_p, + mu=mu, + bandwidth=bandwidth, + t_window=t_window, + ef=ef, + tip_down=True, + pos_offset=False, + system=system, + gamma_hz=gamma_hz) + + pulse_dict.update({'pre_neg': pre_neg}) + + # tip-up positive offsets + post_pos = make_hsexp(amp=amp, + t_p=t_p, + mu=mu, + bandwidth=bandwidth, + t_window=t_window, + ef=ef, + tip_down=False, + pos_offset=True, + system=system, + gamma_hz=gamma_hz) + + pulse_dict.update({'post_pos': post_pos}) + + # tip-up negative offsets + post_neg = make_hsexp(amp=amp, + t_p=t_p, + mu=mu, + bandwidth=bandwidth, + t_window=t_window, + ef=ef, + tip_down=False, + pos_offset=False, + system=system, + gamma_hz=gamma_hz) + + pulse_dict.update({'post_neg': post_neg}) + + return pulse_dict diff --git a/bmctool/utils/seq/make_hypsec_half_passage_rf.py b/bmctool/utils/pulses/make_hypsec_half_passage.py similarity index 54% rename from bmctool/utils/seq/make_hypsec_half_passage_rf.py rename to bmctool/utils/pulses/make_hypsec_half_passage.py index 4946b7e..c90f5af 100644 --- a/bmctool/utils/seq/make_hypsec_half_passage_rf.py +++ b/bmctool/utils/pulses/make_hypsec_half_passage.py @@ -4,7 +4,9 @@ """ import numpy as np from types import SimpleNamespace -from bmctool.pypulseq.opts import Opts +from pypulseq.opts import Opts +from bmctool.utils.pulses.calculate_phase import calculate_phase +from bmctool.utils.pulses.create_arbitrary_pulse_with_phase import create_arbitrary_pulse_with_phase def calculate_amplitude(t: np.ndarray, @@ -20,7 +22,6 @@ def calculate_amplitude(t: np.ndarray, :param amp: maximum amplitude value [µT] :param mu: parameter µ of hyperbolic secant pulse :param bandwidth: bandwidth of hyperbolic secant pulse [Hz] - :return: """ return np.divide(amp, np.cosh((bandwidth * np.pi / mu) * (t - t_0))) @@ -36,69 +37,11 @@ def calculate_frequency(t: np.ndarray, :param t_0: reference time point (= last point for half passage pulse) [s] :param mu: parameter µ of hyperbolic secant pulse :param bandwidth: bandwidth of hyperbolic secant pulse [Hz] - :return: """ beta = bandwidth * np.pi / mu return bandwidth * np.pi * np.tanh(beta * (t - t_0)) -def calculate_phase(frequency: np.ndarray, - duration: float, - samples: int) \ - -> np.ndarray: - """ - Calculates phase modulation of hyperbolic secant half passage pulse for given frequency modulation. - :param frequency: frequency modulation of pulse - :param duration: pulse duration [s] - :param samples: number of sample points - :return: - """ - phase = frequency * duration / samples - for i in range(1, samples): - phase[i] = phase[i-1] + (frequency[i] * duration/samples) - phase_shift = phase[-1] - for i in range(samples): - phase[i] = np.fmod(phase[i] - phase_shift, 2 * np.pi) - return phase + 2 * np.pi - - -def make_arbitrary_rf_with_phase(signal: np.ndarray, - flip_angle: float, - freq_offset: float = 0, - phase_offset: float = 0, - system: Opts = Opts()) \ - -> (SimpleNamespace, None): - """ - Creates a radio-frequency pulse event with arbitrary pulse shape and phase modulation - :param signal: signal modulation (amplitude and phase) of pulse - :param flip_angle: flip angle of pulse [rad] - :param freq_offset: frequency offset [Hz] - :param phase_offset: phase offset [rad] - :param system: system limits of the MR scanner - :return: - """ - - signal = signal * flip_angle / (2 * np.pi) - t = np.linspace(1, len(signal)) * system.rf_raster_time - - rf = SimpleNamespace() - rf.type = 'rf' - rf.signal = signal - rf.t = t - rf.freq_offset = freq_offset - rf.phase_offset = phase_offset - rf.dead_time = system.rf_dead_time - rf.ringdown_time = system.rf_ringdown_time - rf.delay = system.rf_dead_time - - if rf.ringdown_time > 0: - t_fill = np.arange(1, round(rf.ringdown_time / 1e-6) + 1) * 1e-6 - rf.t = np.concatenate((rf.t, rf.t[-1] + t_fill)) - rf.signal = np.concatenate((rf.signal, np.zeros(len(t_fill)))) - - return rf, None - - def make_hypsec_half_passage_rf(amp: float, pulse_duration: float = 8e-3, mu: float = 6, @@ -108,11 +51,10 @@ def make_hypsec_half_passage_rf(amp: float, """ Creates block event for an hyperbolic secant half passage pulse according to DOI: 10.1002/mrm.26370. :param amp: maximum amplitude value [µT] - :param pulse_duration: pulse duration [s] + :param pulse_duration: pulse pulse_duration [s] :param mu: parameter µ of hyperbolic secant pulse :param bandwidth: bandwidth of hyperbolic secant pulse [Hz] :param system: system limits of the MR scanner - :return: """ samples = int(pulse_duration * 1e6) @@ -124,5 +66,5 @@ def make_hypsec_half_passage_rf(amp: float, phase = calculate_phase(frequency=freq, duration=pulse_duration, samples=samples) signal = np.multiply(w1, np.exp(1j * phase)) flip_angle = amp * 1e-6 * system.gamma * 2 * np.pi # factor 1e-6 converts from µT to T - hs_half_passage, _ = make_arbitrary_rf_with_phase(signal=signal, flip_angle=flip_angle, system=system) + hs_half_passage, _ = create_arbitrary_pulse_with_phase(signal=signal, flip_angle=flip_angle, system=system) return hs_half_passage diff --git a/bmctool/utils/seq/auxiliary.py b/bmctool/utils/seq/auxiliary.py new file mode 100644 index 0000000..0f77140 --- /dev/null +++ b/bmctool/utils/seq/auxiliary.py @@ -0,0 +1,39 @@ +""" +auxiliary.py + Auxiliary functions for seq files. +""" +from pypulseq.Sequence.sequence import Sequence +from bmctool.utils.seq.read import read_any_version + + +def get_offsets(seq: Sequence = None, + seq_file: str = None) \ + -> list: + """ + Reads offsets either from a seq file or from a Sequence object. + :param seq_file: sequence file to read the offsets from + :param seq: Sequence object to get the offsets from + :return: list of offsets + """ + if not seq and not seq_file: + raise ValueError('You need to pass either the sequence filename or the Sequence object.') + if not seq: + seq = read_any_version(seq_file=seq_file) + offsets = seq.definitions['offsets_ppm'] + return offsets + + +def get_num_adc_events(seq: Sequence = None, + seq_file: str = None) \ + -> int: + """ + Reads number of ADC events (should equal number of offsets). + :param seq: Sequence object to get the offsets from + :param seq_file: sequence file to read the offsets from + :return: num_adc_events + """ + if not seq and not seq_file: + raise ValueError('You need to pass either the sequence filename or the Sequence object.') + offsets = get_offsets(seq_file=seq_file) + num_adc_events = len(offsets) + return num_adc_events \ No newline at end of file diff --git a/bmctool/utils/seq/read.py b/bmctool/utils/seq/read.py index c9f137e..ec4ed0d 100644 --- a/bmctool/utils/seq/read.py +++ b/bmctool/utils/seq/read.py @@ -1,6 +1,6 @@ """ read.py - Functions for reading single entries from seq files. + Auxiliary functions for reading of seq files and seq file entries. """ from os import remove from pathlib import Path diff --git a/bmctool/utils/seq/write_seq.py b/bmctool/utils/seq/write.py similarity index 98% rename from bmctool/utils/seq/write_seq.py rename to bmctool/utils/seq/write.py index a15daa9..5f0090f 100644 --- a/bmctool/utils/seq/write_seq.py +++ b/bmctool/utils/seq/write.py @@ -1,5 +1,6 @@ """ - write_seq.py +write.py + Auxiliary functions for writing seq files. """ import math from os import fdopen, remove diff --git a/setup.py b/setup.py index ac7ff7f..9a850fa 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ name='BMCTool', author='Patrick Schuenke', author_email='patrick.schuenke@ptb.de', - version='0.2.2', + version='0.3.0', description='A python tool to perform Bloch-McConnell (BMC) simulations.', long_description=long_description, long_description_content_type="text/markdown",