diff --git a/rapidtide/tests/test_delayestimation.py b/rapidtide/tests/test_delayestimation.py index be3778b3..11233842 100755 --- a/rapidtide/tests/test_delayestimation.py +++ b/rapidtide/tests/test_delayestimation.py @@ -30,6 +30,7 @@ import rapidtide.peakeval as tide_peakeval import rapidtide.resample as tide_resample import rapidtide.simfuncfit as tide_simfuncfit +import rapidtide.util as tide_util try: import mkl @@ -39,34 +40,6 @@ mklexists = False -def numpy2shared(inarray, thetype): - thesize = inarray.size - theshape = inarray.shape - if thetype == np.float64: - inarray_shared = mp.RawArray("d", inarray.reshape(thesize)) - else: - inarray_shared = mp.RawArray("f", inarray.reshape(thesize)) - inarray = np.frombuffer(inarray_shared, dtype=thetype, count=thesize) - inarray.shape = theshape - return inarray - - -def allocshared(theshape, thetype): - thesize = int(1) - if not isinstance(theshape, (list, tuple)): - thesize = theshape - else: - for element in theshape: - thesize *= int(element) - if thetype == np.float64: - outarray_shared = mp.RawArray("d", thesize) - else: - outarray_shared = mp.RawArray("f", thesize) - outarray = np.frombuffer(outarray_shared, dtype=thetype, count=thesize) - outarray.shape = theshape - return outarray, outarray_shared, theshape - - def multisine(timepoints, parameterlist): output = timepoints * 0.0 for element in parameterlist: @@ -144,7 +117,7 @@ def test_delayestimation(displayplots=False, debug=False): plt.show() threshval = pedestal / 4.0 - waveforms = numpy2shared(waveforms, np.float64) + waveforms, waveforms_shm = tide_util.numpy2shared(waveforms, np.float64) referencetc = tide_resample.doresample( timepoints, waveforms[refnum, :], oversamptimepoints, method=interptype @@ -170,8 +143,8 @@ def test_delayestimation(displayplots=False, debug=False): dummy, trimmedcorrscale, dummy = theCorrelator.getfunction() corroutlen = np.shape(trimmedcorrscale)[0] internalvalidcorrshape = (numlocs, corroutlen) - corrout, dummy, dummy = allocshared(internalvalidcorrshape, np.float64) - meanval, dummy, dummy = allocshared((numlocs), np.float64) + corrout, corrout_shm = tide_util.allocshared(internalvalidcorrshape, np.float64) + meanval, meanval_shm = tide_util.allocshared((numlocs), np.float64) if debug: print("corrout shape:", corrout.shape) print("theCorrelator: corroutlen=", corroutlen) @@ -210,21 +183,21 @@ def test_delayestimation(displayplots=False, debug=False): peakfittype=peakfittype, ) - lagtc, dummy, dummy = allocshared(waveforms.shape, np.float64) - fitmask, dummy, dummy = allocshared((numlocs), "uint16") - failreason, dummy, dummy = allocshared((numlocs), "uint32") - lagtimes, dummy, dummy = allocshared((numlocs), np.float64) - lagstrengths, dummy, dummy = allocshared((numlocs), np.float64) - lagsigma, dummy, dummy = allocshared((numlocs), np.float64) - gaussout, dummy, dummy = allocshared(internalvalidcorrshape, np.float64) - windowout, dummy, dummy = allocshared(internalvalidcorrshape, np.float64) - rvalue, dummy, dummy = allocshared((numlocs), np.float64) - r2value, dummy, dummy = allocshared((numlocs), np.float64) - fitcoff, dummy, dummy = allocshared((waveforms.shape), np.float64) - fitNorm, dummy, dummy = allocshared((waveforms.shape), np.float64) - R2, dummy, dummy = allocshared((numlocs), np.float64) - movingsignal, dummy, dummy = allocshared(waveforms.shape, np.float64) - filtereddata, dummy, dummy = allocshared(waveforms.shape, np.float64) + lagtc, lagtc_shm = tide_util.allocshared(waveforms.shape, np.float64) + fitmask, fitmask_shm = tide_util.allocshared((numlocs), "uint16") + failreason, failreason_shm = tide_util.allocshared((numlocs), "uint32") + lagtimes, lagtimes_shm = tide_util.allocshared((numlocs), np.float64) + lagstrengths, lagstrengths_shm = tide_util.allocshared((numlocs), np.float64) + lagsigma, lagsigma_shm = tide_util.allocshared((numlocs), np.float64) + gaussout, gaussout_shm = tide_util.allocshared(internalvalidcorrshape, np.float64) + windowout, windowout_shm = tide_util.allocshared(internalvalidcorrshape, np.float64) + rvalue, rvalue_shm = tide_util.allocshared((numlocs), np.float64) + r2value, r2value_shm = tide_util.allocshared((numlocs), np.float64) + fitcoff, fitcoff_shm = tide_util.allocshared((waveforms.shape), np.float64) + fitNorm, fitNorm_shm = tide_util.allocshared((waveforms.shape), np.float64) + R2, R2_shm = tide_util.allocshared((numlocs), np.float64) + movingsignal, movingsignal_shm = tide_util.allocshared(waveforms.shape, np.float64) + filtereddata, filtereddata_shm = tide_util.allocshared(waveforms.shape, np.float64) for nprocs in [4, 1]: # call correlationpass @@ -357,7 +330,7 @@ def test_delayestimation(displayplots=False, debug=False): ax.legend() plt.show() - filteredwaveforms, dummy, dummy = allocshared(waveforms.shape, np.float64) + filteredwaveforms, filteredwaveforms_shm = tide_util.allocshared(waveforms.shape, np.float64) for i in range(numlocs): filteredwaveforms[i, :] = theprefilter.apply(Fs, waveforms[i, :]) diff --git a/rapidtide/tests/test_sharedmem.py b/rapidtide/tests/test_sharedmem.py new file mode 100755 index 00000000..4c59fa57 --- /dev/null +++ b/rapidtide/tests/test_sharedmem.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2016-2024 Blaise Frederick +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import numpy as np + +import rapidtide.util as tide_util + + +def test_numpy2shared(debug=False): + vectorlen = 1000 + for intype in [np.float32, np.float64]: + sourcevector = np.random.normal(size=vectorlen).astype(intype) + if debug: + print(f"{intype=}, {sourcevector.size=}, {sourcevector.dtype=}") + for outtype in [np.float32, np.float64]: + + destvector, shm = tide_util.numpy2shared(sourcevector, outtype) + if debug: + print(f"\t{outtype=}, {destvector.size=}, {destvector.dtype=}") + + # check everything + assert destvector.dtype == outtype + assert destvector.size == sourcevector.size + np.testing.assert_almost_equal(sourcevector, destvector, 3) + + # clean up + tide_util.cleanup_shm(shm) + + +def test_allocshared(debug=False): + datashape = (10, 10, 10) + for outtype in [np.float32, np.float64]: + destarray, shm = tide_util.allocshared(datashape, outtype) + if debug: + print(f"{outtype=}, {destarray.size=}, {destarray.dtype=}") + + # check everything + assert destarray.dtype == outtype + assert destarray.size == np.prod(datashape) + + # clean up if needed + tide_util.cleanup_shm(shm) + + +if __name__ == "__main__": + test_numpy2shared(debug=True) + test_allocshared(debug=True) diff --git a/rapidtide/util.py b/rapidtide/util.py index 58601e74..9b19781c 100644 --- a/rapidtide/util.py +++ b/rapidtide/util.py @@ -18,7 +18,6 @@ # import bisect import logging -import multiprocessing as mp import os import platform import resource @@ -27,6 +26,7 @@ import sys import time from datetime import datetime +from multiprocessing import RawArray, shared_memory import matplotlib.pyplot as plt import numpy as np @@ -1003,29 +1003,28 @@ def comparehappyruns(root1, root2, debug=False): # shared memory routines -def numpy2shared(inarray, thetype): - thesize = inarray.size - theshape = inarray.shape - if thetype == np.float64: - inarray_shared = mp.RawArray("d", inarray.reshape(thesize)) - else: - inarray_shared = mp.RawArray("f", inarray.reshape(thesize)) - inarray = np.frombuffer(inarray_shared, dtype=thetype, count=thesize) - inarray.shape = theshape - return inarray +def numpy2shared(inarray, theouttype): + # Create a shared memory block to store the array data + outnbytes = np.dtype(theouttype).itemsize * inarray.size + shm = shared_memory.SharedMemory(create=True, size=outnbytes) + inarray_shared = np.ndarray(inarray.shape, dtype=theouttype, buffer=shm.buf) + np.copyto(inarray_shared, inarray) # Copy data to shared memory array + return inarray_shared, shm # Return both the array and the shared memory object def allocshared(theshape, thetype): - thesize = int(1) - if not isinstance(theshape, (list, tuple)): - thesize = theshape - else: - for element in theshape: - thesize *= int(element) - if thetype == np.float64: - outarray_shared = mp.RawArray("d", thesize) - else: - outarray_shared = mp.RawArray("f", thesize) - outarray = np.frombuffer(outarray_shared, dtype=thetype, count=thesize) - outarray.shape = theshape - return outarray, outarray_shared, theshape + # Calculate size based on shape + thesize = np.prod(theshape) + # Determine the data type size + dtype_size = np.dtype(thetype).itemsize + # Create a shared memory block of the required size + shm = shared_memory.SharedMemory(create=True, size=thesize * dtype_size) + outarray = np.ndarray(theshape, dtype=thetype, buffer=shm.buf) + return outarray, shm # Return both the array and the shared memory object + + +def cleanup_shm(shm): + # Cleanup + if shm is not None: + shm.close() + shm.unlink() diff --git a/rapidtide/workflows/rapidtide.py b/rapidtide/workflows/rapidtide.py index 69d334cd..2cb803fd 100755 --- a/rapidtide/workflows/rapidtide.py +++ b/rapidtide/workflows/rapidtide.py @@ -796,7 +796,9 @@ def rapidtide_main(argparsingfunc): tide_util.numpy2shared_func = addmemprofiling( tide_util.numpy2shared, optiondict["memprofile"], "before fmri data move" ) - fmri_data_valid = tide_util.numpy2shared_func(fmri_data_valid, rt_floatset) + fmri_data_valid, fmri_data_valid_shm = tide_util.numpy2shared_func( + fmri_data_valid, rt_floatset + ) TimingLGR.verbose("End moving fmri_data to shared memory") # read in any motion and/or other confound regressors here @@ -1605,10 +1607,10 @@ def rapidtide_main(argparsingfunc): f"allocating memory for correlation arrays {internalcorrshape} {internalvalidcorrshape}" ) if optiondict["sharedmem"]: - corrout, dummy, dummy = tide_util.allocshared(internalvalidcorrshape, rt_floatset) - gaussout, dummy, dummy = tide_util.allocshared(internalvalidcorrshape, rt_floatset) - windowout, dummy, dummy = tide_util.allocshared(internalvalidcorrshape, rt_floatset) - outcorrarray, dummy, dummy = tide_util.allocshared(internalcorrshape, rt_floatset) + corrout, corrout_shm = tide_util.allocshared(internalvalidcorrshape, rt_floatset) + gaussout, gaussout_shm = tide_util.allocshared(internalvalidcorrshape, rt_floatset) + windowout, windowout_shm = tide_util.allocshared(internalvalidcorrshape, rt_floatset) + outcorrarray, outcorrarray_shm = tide_util.allocshared(internalcorrshape, rt_floatset) else: corrout = np.zeros(internalvalidcorrshape, dtype=rt_floattype) gaussout = np.zeros(internalvalidcorrshape, dtype=rt_floattype) @@ -1653,27 +1655,23 @@ def rapidtide_main(argparsingfunc): or optiondict["convergencethresh"] is not None ): if optiondict["sharedmem"]: - shiftedtcs, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_floatset) - weights, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_floatset) - paddedshiftedtcs, dummy, dummy = tide_util.allocshared( + shiftedtcs, shiftedtcs_shm = tide_util.allocshared(internalvalidfmrishape, rt_floatset) + weights, weights_shm = tide_util.allocshared(internalvalidfmrishape, rt_floatset) + paddedshiftedtcs, paddedshiftedtcs_shm = tide_util.allocshared( internalvalidpaddedfmrishape, rt_floatset ) - paddedweights, dummy, dummy = tide_util.allocshared( + paddedweights, paddedweights_shm = tide_util.allocshared( internalvalidpaddedfmrishape, rt_floatset ) else: shiftedtcs = np.zeros(internalvalidfmrishape, dtype=rt_floattype) weights = np.zeros(internalvalidfmrishape, dtype=rt_floattype) - paddedshiftedtcs, dummy, dummy = tide_util.allocshared( - internalvalidpaddedfmrishape, rt_floatset - ) - paddedweights, dummy, dummy = tide_util.allocshared( - internalvalidpaddedfmrishape, rt_floatset - ) + paddedshiftedtcs = np.zeros(internalvalidpaddedfmrishape, dtype=rt_floattype) + paddedweights = np.zeros(internalvalidpaddedfmrishape, dtype=rt_floattype) tide_util.logmem("after refinement array allocation") if optiondict["sharedmem"]: - outfmriarray, dummy, dummy = tide_util.allocshared(internalfmrishape, rt_floatset) + outfmriarray, outfmriarray_shm = tide_util.allocshared(internalfmrishape, rt_floatset) else: outfmriarray = np.zeros(internalfmrishape, dtype=rt_floattype) @@ -2986,20 +2984,18 @@ def rapidtide_main(argparsingfunc): # now allocate the arrays needed for the coherence calculation if optiondict["sharedmem"]: - coherencefunc, dummy, dummy = tide_util.allocshared( + coherencefunc, coherencefunc_shm = tide_util.allocshared( internalvalidcoherenceshape, rt_outfloatset ) - coherencepeakval, dummy, dummy = tide_util.allocshared( + coherencepeakval, coherencepeakval_shm = tide_util.allocshared( numvalidspatiallocs, rt_outfloatset ) - coherencepeakfreq, dummy, dummy = tide_util.allocshared( + coherencepeakfreq, coherencepeakfreq_shm = tide_util.allocshared( numvalidspatiallocs, rt_outfloatset ) else: coherencefunc = np.zeros(internalvalidcoherenceshape, dtype=rt_outfloattype) - coherencepeakval, dummy, dummy = tide_util.allocshared( - numvalidspatiallocs, rt_outfloatset - ) + coherencepeakval = np.zeros(numvalidspatiallocs, dtype=rt_outfloattype) coherencepeakfreq = np.zeros(numvalidspatiallocs, dtype=rt_outfloattype) coherencepass_func = addmemprofiling( @@ -3068,10 +3064,10 @@ def rapidtide_main(argparsingfunc): # now allocate the arrays needed for Wiener deconvolution if optiondict["sharedmem"]: - wienerdeconv, dummy, dummy = tide_util.allocshared( + wienerdeconv, wienerdeconv_shm = tide_util.allocshared( internalvalidspaceshape, rt_outfloatset ) - wpeak, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + wpeak, wpeak_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) else: wienerdeconv = np.zeros(internalvalidspaceshape, dtype=rt_outfloattype) wpeak = np.zeros(internalvalidspaceshape, dtype=rt_outfloattype) @@ -3171,7 +3167,9 @@ def rapidtide_main(argparsingfunc): optiondict["memprofile"], "before movetoshared (glm)", ) - fmri_data_valid = numpy2shared_func(fmri_data_valid, rt_floatset) + fmri_data_valid, fmri_data_valid_shm = numpy2shared_func( + fmri_data_valid, rt_floatset + ) TimingLGR.info("End moving fmri_data to shared memory") del nim_data @@ -3181,20 +3179,20 @@ def rapidtide_main(argparsingfunc): optiondict["glmderivs"] + 1, ) if optiondict["sharedmem"]: - glmmean, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) - rvalue, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) - r2value, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) - fitNorm, dummy, dummy = tide_util.allocshared( + glmmean, glmmean_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + rvalue, rvalue_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + r2value, r2value_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + fitNorm, fitNorm_shm = tide_util.allocshared( internalvalidspaceshapederivs, rt_outfloatset ) - fitcoeff, dummy, dummy = tide_util.allocshared( + fitcoeff, fitcoeff_shm = tide_util.allocshared( internalvalidspaceshapederivs, rt_outfloatset ) - movingsignal, dummy, dummy = tide_util.allocshared( + movingsignal, movingsignal_shm = tide_util.allocshared( internalvalidfmrishape, rt_outfloatset ) - lagtc, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_floatset) - filtereddata, dummy, dummy = tide_util.allocshared( + lagtc, lagtc_shm = tide_util.allocshared(internalvalidfmrishape, rt_floatset) + filtereddata, filtereddata_shm = tide_util.allocshared( internalvalidfmrishape, rt_outfloatset ) else: @@ -3300,6 +3298,8 @@ def rapidtide_main(argparsingfunc): varchange = initialvariance * 0.0 varchange[divlocs] = 100.0 * (finalvariance[divlocs] / initialvariance[divlocs] - 1.0) del fmri_data_valid + if optiondict["sharedmem"]: + tide_util.cleanup_shm(fmri_data_valid_shm) LGR.info("End filtering operation") TimingLGR.info( @@ -3607,6 +3607,7 @@ def rapidtide_main(argparsingfunc): rt_floattype=rt_floattype, cifti_hdr=cifti_hdr, ) + del glmmean del rvalue del r2value del fitcoeff @@ -3614,6 +3615,12 @@ def rapidtide_main(argparsingfunc): del initialvariance del finalvariance del varchange + if optiondict["sharedmem"]: + tide_util.cleanup_shm(glmmean_shm) + tide_util.cleanup_shm(rvalue_shm) + tide_util.cleanup_shm(r2value_shm) + tide_util.cleanup_shm(fitcoeff_shm) + tide_util.cleanup_shm(fitNorm_shm) # write the 3D maps that don't need to be remapped maplist = [ diff --git a/rapidtide/workflows/rapidtide_parser.py b/rapidtide/workflows/rapidtide_parser.py index 2b1ae3fc..f1a312ea 100755 --- a/rapidtide/workflows/rapidtide_parser.py +++ b/rapidtide/workflows/rapidtide_parser.py @@ -1371,9 +1371,7 @@ def _get_parser(): "--premask", dest="premask", action="store_true", - help=( - "Apply masking prior to spatial filtering to limit delay calculation to brain tissue only." - ), + help=("Apply masking prior to spatial filtering to limit extracerebral sources."), default=False, ) experimental.add_argument( diff --git a/rapidtide/workflows/retroglm.py b/rapidtide/workflows/retroglm.py index 2eebec14..2565efda 100644 --- a/rapidtide/workflows/retroglm.py +++ b/rapidtide/workflows/retroglm.py @@ -322,18 +322,20 @@ def retroglm(args): if usesharedmem: if args.debug: print("allocating shared memory") - glmmean, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) - rvalue, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) - r2value, dummy, dummy = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) - fitNorm, dummy, dummy = tide_util.allocshared( + glmmean, glmmean_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + rvalue, rvalue_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + r2value, r2value_shm = tide_util.allocshared(internalvalidspaceshape, rt_outfloatset) + fitNorm, fitNorm_shm = tide_util.allocshared(internalvalidspaceshapederivs, rt_outfloatset) + fitcoeff, fitcoeff_shm = tide_util.allocshared( internalvalidspaceshapederivs, rt_outfloatset ) - fitcoeff, dummy, dummy = tide_util.allocshared( - internalvalidspaceshapederivs, rt_outfloatset + movingsignal, movingsignal_shm = tide_util.allocshared( + internalvalidfmrishape, rt_outfloatset + ) + lagtc, lagtc_shm = tide_util.allocshared(internalvalidfmrishape, rt_floatset) + filtereddata, filtereddata_shm = tide_util.allocshared( + internalvalidfmrishape, rt_outfloatset ) - movingsignal, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_outfloatset) - lagtc, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_floatset) - filtereddata, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_outfloatset) else: if args.debug: print("allocating memory") diff --git a/rapidtide/workflows/retrolagtcs.py b/rapidtide/workflows/retrolagtcs.py index b9c3dd0e..6516b572 100644 --- a/rapidtide/workflows/retrolagtcs.py +++ b/rapidtide/workflows/retrolagtcs.py @@ -223,13 +223,11 @@ def retrolagtcs(args): if usesharedmem: if args.debug: print("allocating shared memory") - fitNorm, dummy, dummy = tide_util.allocshared( + fitNorm, fitNorm_shm = tide_util.allocshared(internalvalidspaceshapederivs, rt_outfloatset) + fitcoeff, fitcoeff_shm = tide_util.allocshared( internalvalidspaceshapederivs, rt_outfloatset ) - fitcoeff, dummy, dummy = tide_util.allocshared( - internalvalidspaceshapederivs, rt_outfloatset - ) - lagtc, dummy, dummy = tide_util.allocshared(internalvalidfmrishape, rt_floatset) + lagtc, lagtc_shm = tide_util.allocshared(internalvalidfmrishape, rt_floatset) else: if args.debug: print("allocating memory")