Skip to content

Commit

Permalink
Replace memmap usage with direct file I/O (#82)
Browse files Browse the repository at this point in the history
* Replace memmap usage with direct file I/O

Previously, the library made extensive usage of memory-map to copy the
contents of inputs & outputs to & from binary files for compatibility
with the SNAPHU executable. Memory-mapping is convenient for copying 2-D
blocks of data to ensure that we don't run out of memory while copying
entire large datasets.

However, there seems to be no interface in NumPy or Python's `mmap`
library to ensure that the memory-mapped file is safely closed (at least
prior to Python 3.13 -- see
python/cpython#78502). This may cause issues
when unwrapping a large series of interferograms in a single process,
potentially leading to the number of open file descriptors exceeding the
system's resource limits.

In this update, we replace usage of `numpy.memmap` with direct file I/O.
In order to avoid undue complexity (as well as potential performance
issues due to buffered file access) we now read/write batches of data
that span all columns of the input/output 2-D datasets, rather than
operating on 2-D sub-blocks of data. This new approach gives full
control over when files are opened and closed, in order to avoid leaking
resources.

* Close files opened by `tempfile.mkstemp`

Add a wrapper for `tempfile.mkstemp` that ensures that open files are
properly closed. Replace usage of `tempfile.mkstemp` with the wrapper
function.
  • Loading branch information
gmgunter authored Sep 16, 2024
1 parent b21dd68 commit 621cfe5
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 299 deletions.
58 changes: 28 additions & 30 deletions src/snaphu/_conncomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import textwrap
from pathlib import Path
from tempfile import mkstemp
from typing import overload

import numpy as np
Expand All @@ -16,7 +15,13 @@
check_integer_dtype,
)
from ._snaphu import run_snaphu
from ._util import copy_blockwise, nan_to_zero, scratch_directory
from ._util import (
nan_to_zero,
new_unique_file,
read_from_file,
scratch_directory,
write_to_file,
)
from .io import InputDataset, OutputDataset

__all__ = [
Expand Down Expand Up @@ -101,8 +106,8 @@ def regrow_conncomp_from_unw(

# Write config parameters to file. The config file should have a descriptive name to
# disambiguate it from the config file used for unwrapping.
_, config_file = mkstemp(
dir=scratchdir, prefix="snaphu-regrow-conncomps.config.", suffix=".txt"
config_file = new_unique_file(
dir_=scratchdir, prefix="snaphu-regrow-conncomps.config.", suffix=".txt"
)
Path(config_file).write_text(config)

Expand Down Expand Up @@ -273,56 +278,49 @@ def grow_conncomps( # type: ignore[no-untyped-def]
# Create a raw binary file in the scratch directory for the unwrapped phase and
# copy the input data to it. (`mkstemp` is used to avoid data races in case the
# same scratch directory was used for multiple SNAPHU processes.)
_, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape)
copy_blockwise(unw, tmp_unw_mmap, transform=nan_to_zero)
tmp_unw_mmap.flush()
unw_file = new_unique_file(dir_=dir_, prefix="snaphu.unw.", suffix=".f4")
write_to_file(unw, unw_file, transform=nan_to_zero, dtype=np.float32)

# Copy the input coherence data to a raw binary file in the scratch directory.
_, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape)
copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero)
tmp_corr_mmap.flush()
corr_file = new_unique_file(dir_=dir_, prefix="snaphu.corr.", suffix=".f4")
write_to_file(corr, corr_file, transform=nan_to_zero, dtype=np.float32)

# If magnitude data was provided, copy it to a raw binary file in the scratch
# directory.
if mag is None:
tmp_mag = None
mag_file = None
else:
_, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=mag.shape)
copy_blockwise(mag, tmp_mag_mmap, transform=nan_to_zero)
tmp_mag_mmap.flush()
mag_file = new_unique_file(dir_=dir_, prefix="snaphu.mag.", suffix=".f4")
write_to_file(mag, mag_file, transform=nan_to_zero, dtype=np.float32)

# If a mask was provided, copy the mask data to a raw binary file in the scratch
# directory.
if mask is None:
tmp_mask = None
mask_file = None
else:
_, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape)
copy_blockwise(mask, tmp_mask_mmap)
tmp_mask_mmap.flush()
mask_file = new_unique_file(dir_=dir_, prefix="snaphu.mask.", suffix=".u1")
write_to_file(mask, mask_file, dtype=np.bool_)

# Create a raw file in the scratch directory for the output connected
# components.
_, tmp_conncomp = mkstemp(dir=dir_, prefix="snaphu.conncomp.", suffix=".u4")
conncomp_file = new_unique_file(
dir_=dir_, prefix="snaphu.conncomp.", suffix=".u4"
)

regrow_conncomp_from_unw(
unw_file=tmp_unw,
corr_file=tmp_corr,
conncomp_file=tmp_conncomp,
unw_file=unw_file,
corr_file=corr_file,
conncomp_file=conncomp_file,
line_length=unw.shape[1],
nlooks=nlooks,
cost=cost,
mag_file=tmp_mag,
mask_file=tmp_mask,
mag_file=mag_file,
mask_file=mask_file,
min_conncomp_frac=min_conncomp_frac,
scratchdir=dir_,
)

# Get the output connected component labels.
tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape)
copy_blockwise(tmp_cc_mmap, conncomp)
read_from_file(conncomp, conncomp_file, dtype=np.uint32)

return conncomp
73 changes: 34 additions & 39 deletions src/snaphu/_unwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import textwrap
from pathlib import Path
from tempfile import mkstemp
from typing import cast, overload

import numpy as np
Expand All @@ -19,7 +18,13 @@
)
from ._conncomp import regrow_conncomp_from_unw
from ._snaphu import run_snaphu
from ._util import copy_blockwise, nan_to_zero, scratch_directory
from ._util import (
nan_to_zero,
new_unique_file,
read_from_file,
scratch_directory,
write_to_file,
)
from .io import InputDataset, OutputDataset

__all__ = [
Expand Down Expand Up @@ -331,40 +336,36 @@ def unwrap( # type: ignore[no-untyped-def]
# Create a raw binary file in the scratch directory for the interferogram and
# copy the input data to it. (`mkstemp` is used to avoid data races in case the
# same scratch directory was used for multiple SNAPHU processes.)
_, tmp_igram = mkstemp(dir=dir_, prefix="snaphu.igram.", suffix=".c8")
tmp_igram_mmap = np.memmap(tmp_igram, dtype=np.complex64, shape=igram.shape)
copy_blockwise(igram, tmp_igram_mmap, transform=nan_to_zero)
tmp_igram_mmap.flush()
igram_file = new_unique_file(dir_=dir_, prefix="snaphu.igram.", suffix=".c8")
write_to_file(igram, igram_file, transform=nan_to_zero, dtype=np.complex64)

# Copy the input coherence data to a raw binary file in the scratch directory.
_, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape)
copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero)
tmp_corr_mmap.flush()
corr_file = new_unique_file(dir_=dir_, prefix="snaphu.corr.", suffix=".f4")
write_to_file(corr, corr_file, transform=nan_to_zero, dtype=np.float32)

# If a mask was provided, copy the mask data to a raw binary file in the scratch
# directory.
if mask is None:
tmp_mask = None
mask_file = None
else:
_, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape)
copy_blockwise(mask, tmp_mask_mmap)
tmp_mask_mmap.flush()
mask_file = new_unique_file(dir_=dir_, prefix="snaphu.mask.", suffix=".u1")
write_to_file(mask, mask_file, dtype=np.bool_)

# Create files in the scratch directory for SNAPHU outputs.
_, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
_, tmp_conncomp = mkstemp(dir=dir_, prefix="snaphu.conncomp.", suffix=".u4")
unw_file = new_unique_file(dir_=dir_, prefix="snaphu.unw.", suffix=".f4")
conncomp_file = new_unique_file(
dir_=dir_, prefix="snaphu.conncomp.", suffix=".u4"
)

config = textwrap.dedent(
f"""\
INFILE {tmp_igram}
INFILE {igram_file}
INFILEFORMAT COMPLEX_DATA
CORRFILE {tmp_corr}
CORRFILE {corr_file}
CORRFILEFORMAT FLOAT_DATA
OUTFILE {tmp_unw}
OUTFILE {unw_file}
OUTFILEFORMAT FLOAT_DATA
CONNCOMPFILE {tmp_conncomp}
CONNCOMPFILE {conncomp_file}
CONNCOMPOUTTYPE UINT
LINELENGTH {igram.shape[1]}
NCORRLOOKS {nlooks}
Expand All @@ -383,7 +384,7 @@ def unwrap( # type: ignore[no-untyped-def]
"""
)
if mask is not None:
config += f"BYTEMASKFILE {tmp_mask}\n"
config += f"BYTEMASKFILE {mask_file}\n"

# Optionally re-optimize the unwrapped phase using a single tile after
# unwrapping in tiled mode. This step should have no effect when running in
Expand All @@ -393,7 +394,7 @@ def unwrap( # type: ignore[no-untyped-def]
config += "SINGLETILEREOPTIMIZE TRUE\n"

# Write config parameters to file.
_, config_file = mkstemp(dir=dir_, prefix="snaphu.config.", suffix=".txt")
config_file = new_unique_file(dir_=dir_, prefix="snaphu.config.", suffix=".txt")
Path(config_file).write_text(config)

# Run SNAPHU with the specified parameters.
Expand All @@ -409,33 +410,27 @@ def unwrap( # type: ignore[no-untyped-def]
# for example, to detect zero-magnitude pixels which should be masked out
# (i.e. connected component label set to 0). So compute the interferogram
# magnitude and pass it as a separate input file.
_, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=igram.shape)
copy_blockwise(tmp_igram_mmap, tmp_mag_mmap, transform=np.abs)
tmp_mag_mmap.flush()
mag_file = new_unique_file(dir_=dir_, prefix="snaphu.mag.", suffix=".f4")
write_to_file(igram, mag_file, transform=np.abs, dtype=np.float32)

# Re-run SNAPHU to compute new connected components from the unwrapped phase
# as though in single-tile mode, overwriting the original connected
# components file.
regrow_conncomp_from_unw(
unw_file=tmp_unw,
corr_file=tmp_corr,
conncomp_file=tmp_conncomp,
unw_file=unw_file,
corr_file=corr_file,
conncomp_file=conncomp_file,
line_length=igram.shape[1],
nlooks=nlooks,
cost=cost,
mag_file=tmp_mag,
mask_file=tmp_mask,
mag_file=mag_file,
mask_file=mask_file,
min_conncomp_frac=min_conncomp_frac,
scratchdir=dir_,
)

# Get the output unwrapped phase data.
tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape)
copy_blockwise(tmp_unw_mmap, unw)

# Get the output connected component labels.
tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape)
copy_blockwise(tmp_cc_mmap, conncomp)
# Get the output unwrapped phase and connected component labels.
read_from_file(unw, unw_file, dtype=np.float32)
read_from_file(conncomp, conncomp_file, dtype=np.uint32)

return unw, conncomp
Loading

0 comments on commit 621cfe5

Please sign in to comment.