Skip to content

Commit

Permalink
Replace memmap usage with direct file I/O
Browse files Browse the repository at this point in the history
Previously, the library made extensive usage of memory-map to copy the
contents of inputs & outputs to & from binary files for compatibility
with the SNAPHU executable. Memory-mapping is convenient for copying 2-D
blocks of data to ensure that we don't run out of memory while copying
entire large datasets.

However, there seems to be no interface in NumPy or Python's `mmap`
library to ensure that the memory-mapped file is safely closed (at least
prior to Python 3.13 -- see
python/cpython#78502). This may cause issues
when unwrapping a large series of interferograms in a single process,
potentially leading to the number of open file descriptors exceeding the
system's resource limits.

In this update, we replace usage of `numpy.memmap` with direct file I/O.
In order to avoid undue complexity (as well as potential performance
issues due to buffered file access) we now read/write batches of data
that span all columns of the input/output 2-D datasets, rather than
operating on 2-D sub-blocks of data. This new approach gives full
control over when files are opened and closed, in order to avoid leaking
resources.
  • Loading branch information
gmgunter committed Sep 13, 2024
1 parent b21dd68 commit dfb2cb6
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 299 deletions.
45 changes: 18 additions & 27 deletions src/snaphu/_conncomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
check_integer_dtype,
)
from ._snaphu import run_snaphu
from ._util import copy_blockwise, nan_to_zero, scratch_directory
from ._util import nan_to_zero, read_from_file, scratch_directory, write_to_file
from .io import InputDataset, OutputDataset

__all__ = [
Expand Down Expand Up @@ -273,56 +273,47 @@ def grow_conncomps( # type: ignore[no-untyped-def]
# Create a raw binary file in the scratch directory for the unwrapped phase and
# copy the input data to it. (`mkstemp` is used to avoid data races in case the
# same scratch directory was used for multiple SNAPHU processes.)
_, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape)
copy_blockwise(unw, tmp_unw_mmap, transform=nan_to_zero)
tmp_unw_mmap.flush()
_, unw_file = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
write_to_file(unw, unw_file, transform=nan_to_zero, dtype=np.float32)

# Copy the input coherence data to a raw binary file in the scratch directory.
_, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape)
copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero)
tmp_corr_mmap.flush()
_, corr_file = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
write_to_file(corr, corr_file, transform=nan_to_zero, dtype=np.float32)

# If magnitude data was provided, copy it to a raw binary file in the scratch
# directory.
if mag is None:
tmp_mag = None
mag_file = None
else:
_, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=mag.shape)
copy_blockwise(mag, tmp_mag_mmap, transform=nan_to_zero)
tmp_mag_mmap.flush()
_, mag_file = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
write_to_file(mag, mag_file, transform=nan_to_zero, dtype=np.float32)

# If a mask was provided, copy the mask data to a raw binary file in the scratch
# directory.
if mask is None:
tmp_mask = None
mask_file = None
else:
_, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape)
copy_blockwise(mask, tmp_mask_mmap)
tmp_mask_mmap.flush()
_, mask_file = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
write_to_file(mask, mask_file, dtype=np.bool_)

# Create a raw file in the scratch directory for the output connected
# components.
_, tmp_conncomp = mkstemp(dir=dir_, prefix="snaphu.conncomp.", suffix=".u4")
_, conncomp_file = mkstemp(dir=dir_, prefix="snaphu.conncomp.", suffix=".u4")

regrow_conncomp_from_unw(
unw_file=tmp_unw,
corr_file=tmp_corr,
conncomp_file=tmp_conncomp,
unw_file=unw_file,
corr_file=corr_file,
conncomp_file=conncomp_file,
line_length=unw.shape[1],
nlooks=nlooks,
cost=cost,
mag_file=tmp_mag,
mask_file=tmp_mask,
mag_file=mag_file,
mask_file=mask_file,
min_conncomp_frac=min_conncomp_frac,
scratchdir=dir_,
)

# Get the output connected component labels.
tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape)
copy_blockwise(tmp_cc_mmap, conncomp)
read_from_file(conncomp, conncomp_file, dtype=np.uint32)

return conncomp
62 changes: 25 additions & 37 deletions src/snaphu/_unwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from ._conncomp import regrow_conncomp_from_unw
from ._snaphu import run_snaphu
from ._util import copy_blockwise, nan_to_zero, scratch_directory
from ._util import nan_to_zero, read_from_file, scratch_directory, write_to_file
from .io import InputDataset, OutputDataset

__all__ = [
Expand Down Expand Up @@ -331,40 +331,34 @@ def unwrap( # type: ignore[no-untyped-def]
# Create a raw binary file in the scratch directory for the interferogram and
# copy the input data to it. (`mkstemp` is used to avoid data races in case the
# same scratch directory was used for multiple SNAPHU processes.)
_, tmp_igram = mkstemp(dir=dir_, prefix="snaphu.igram.", suffix=".c8")
tmp_igram_mmap = np.memmap(tmp_igram, dtype=np.complex64, shape=igram.shape)
copy_blockwise(igram, tmp_igram_mmap, transform=nan_to_zero)
tmp_igram_mmap.flush()
_, igram_file = mkstemp(dir=dir_, prefix="snaphu.igram.", suffix=".c8")
write_to_file(igram, igram_file, transform=nan_to_zero, dtype=np.complex64)

# Copy the input coherence data to a raw binary file in the scratch directory.
_, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape)
copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero)
tmp_corr_mmap.flush()
_, corr_file = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
write_to_file(corr, corr_file, transform=nan_to_zero, dtype=np.float32)

# If a mask was provided, copy the mask data to a raw binary file in the scratch
# directory.
if mask is None:
tmp_mask = None
mask_file = None
else:
_, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape)
copy_blockwise(mask, tmp_mask_mmap)
tmp_mask_mmap.flush()
_, mask_file = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
write_to_file(mask, mask_file, dtype=np.bool_)

# Create files in the scratch directory for SNAPHU outputs.
_, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
_, tmp_conncomp = mkstemp(dir=dir_, prefix="snaphu.conncomp.", suffix=".u4")
_, unw_file = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
_, conncomp_file = mkstemp(dir=dir_, prefix="snaphu.conncomp.", suffix=".u4")

config = textwrap.dedent(
f"""\
INFILE {tmp_igram}
INFILE {igram_file}
INFILEFORMAT COMPLEX_DATA
CORRFILE {tmp_corr}
CORRFILE {corr_file}
CORRFILEFORMAT FLOAT_DATA
OUTFILE {tmp_unw}
OUTFILE {unw_file}
OUTFILEFORMAT FLOAT_DATA
CONNCOMPFILE {tmp_conncomp}
CONNCOMPFILE {conncomp_file}
CONNCOMPOUTTYPE UINT
LINELENGTH {igram.shape[1]}
NCORRLOOKS {nlooks}
Expand All @@ -383,7 +377,7 @@ def unwrap( # type: ignore[no-untyped-def]
"""
)
if mask is not None:
config += f"BYTEMASKFILE {tmp_mask}\n"
config += f"BYTEMASKFILE {mask_file}\n"

# Optionally re-optimize the unwrapped phase using a single tile after
# unwrapping in tiled mode. This step should have no effect when running in
Expand All @@ -409,33 +403,27 @@ def unwrap( # type: ignore[no-untyped-def]
# for example, to detect zero-magnitude pixels which should be masked out
# (i.e. connected component label set to 0). So compute the interferogram
# magnitude and pass it as a separate input file.
_, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=igram.shape)
copy_blockwise(tmp_igram_mmap, tmp_mag_mmap, transform=np.abs)
tmp_mag_mmap.flush()
_, mag_file = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
write_to_file(igram, mag_file, transform=np.abs, dtype=np.float32)

Check warning on line 407 in src/snaphu/_unwrap.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/_unwrap.py#L406-L407

Added lines #L406 - L407 were not covered by tests

# Re-run SNAPHU to compute new connected components from the unwrapped phase
# as though in single-tile mode, overwriting the original connected
# components file.
regrow_conncomp_from_unw(
unw_file=tmp_unw,
corr_file=tmp_corr,
conncomp_file=tmp_conncomp,
unw_file=unw_file,
corr_file=corr_file,
conncomp_file=conncomp_file,
line_length=igram.shape[1],
nlooks=nlooks,
cost=cost,
mag_file=tmp_mag,
mask_file=tmp_mask,
mag_file=mag_file,
mask_file=mask_file,
min_conncomp_frac=min_conncomp_frac,
scratchdir=dir_,
)

# Get the output unwrapped phase data.
tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape)
copy_blockwise(tmp_unw_mmap, unw)

# Get the output connected component labels.
tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape)
copy_blockwise(tmp_cc_mmap, conncomp)
# Get the output unwrapped phase and connected component labels.
read_from_file(unw, unw_file, dtype=np.float32)
read_from_file(conncomp, conncomp_file, dtype=np.uint32)

return unw, conncomp
Loading

0 comments on commit dfb2cb6

Please sign in to comment.