From b808ab04424c3ef5d22ce2d55edae11afa5800cf Mon Sep 17 00:00:00 2001 From: Geoffrey M Gunter Date: Wed, 11 Sep 2024 01:06:53 -0700 Subject: [PATCH] Ensure memory-mapped files are safely closed * Add `io.MMapArray`, an array_like type that uses a memory-mapped file buffer for storage * Replace usage of `numpy.memmap` with `io.MMapArray` `numpy.memmap` does not have an API that guarantees that the underlying file is closed. This could lead to a large number of file descriptors staying open for much longer than necessary. >Currently there is no API to close the underlying mmap. It is tricky to >ensure the resource is actually closed, since it may be shared between >different memmap instances. https://numpy.org/doc/stable/reference/generated/numpy.memmap.html In practice, this seems to have been the cause of errors encountered in testing due to too many open files when unwrapping multiple interferograms in parallel. ``` OSError: [Errno 24] Too many open files ``` --- src/snaphu/_conncomp.py | 26 ++--- src/snaphu/_unwrap.py | 33 +++--- src/snaphu/io/__init__.py | 2 + src/snaphu/io/__init__.pyi | 2 + src/snaphu/io/_mmap_array.py | 199 +++++++++++++++++++++++++++++++++++ 5 files changed, 230 insertions(+), 32 deletions(-) create mode 100644 src/snaphu/io/_mmap_array.py diff --git a/src/snaphu/_conncomp.py b/src/snaphu/_conncomp.py index e4a6594..225258a 100644 --- a/src/snaphu/_conncomp.py +++ b/src/snaphu/_conncomp.py @@ -17,7 +17,7 @@ ) from ._snaphu import run_snaphu from ._util import copy_blockwise, nan_to_zero, scratch_directory -from .io import InputDataset, OutputDataset +from .io import InputDataset, MMapArray, OutputDataset __all__ = [ "grow_conncomps", @@ -274,15 +274,13 @@ def grow_conncomps( # type: ignore[no-untyped-def] # copy the input data to it. (`mkstemp` is used to avoid data races in case the # same scratch directory was used for multiple SNAPHU processes.) _, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4") - tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape) - copy_blockwise(unw, tmp_unw_mmap, transform=nan_to_zero) - tmp_unw_mmap.flush() + with MMapArray(tmp_unw, shape=unw.shape, dtype=np.float32) as unw_mmap: + copy_blockwise(unw, unw_mmap, transform=nan_to_zero) # Copy the input coherence data to a raw binary file in the scratch directory. _, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4") - tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape) - copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero) - tmp_corr_mmap.flush() + with MMapArray(tmp_corr, shape=corr.shape, dtype=np.float32) as corr_mmap: + copy_blockwise(corr, corr_mmap, transform=nan_to_zero) # If magnitude data was provided, copy it to a raw binary file in the scratch # directory. @@ -290,9 +288,8 @@ def grow_conncomps( # type: ignore[no-untyped-def] tmp_mag = None else: _, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4") - tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=mag.shape) - copy_blockwise(mag, tmp_mag_mmap, transform=nan_to_zero) - tmp_mag_mmap.flush() + with MMapArray(tmp_mag, shape=mag.shape, dtype=np.float32) as mag_mmap: + copy_blockwise(mag, mag_mmap, transform=nan_to_zero) # If a mask was provided, copy the mask data to a raw binary file in the scratch # directory. @@ -300,9 +297,8 @@ def grow_conncomps( # type: ignore[no-untyped-def] tmp_mask = None else: _, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1") - tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape) - copy_blockwise(mask, tmp_mask_mmap) - tmp_mask_mmap.flush() + with MMapArray(tmp_mask, shape=mask.shape, dtype=np.bool_) as mask_mmap: + copy_blockwise(mask, mask_mmap) # Create a raw file in the scratch directory for the output connected # components. @@ -322,7 +318,7 @@ def grow_conncomps( # type: ignore[no-untyped-def] ) # Get the output connected component labels. - tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape) - copy_blockwise(tmp_cc_mmap, conncomp) + with MMapArray(tmp_conncomp, shape=conncomp.shape, dtype=np.uint32) as cc_mmap: + copy_blockwise(cc_mmap, conncomp) return conncomp diff --git a/src/snaphu/_unwrap.py b/src/snaphu/_unwrap.py index f967b58..2884521 100644 --- a/src/snaphu/_unwrap.py +++ b/src/snaphu/_unwrap.py @@ -20,7 +20,7 @@ from ._conncomp import regrow_conncomp_from_unw from ._snaphu import run_snaphu from ._util import copy_blockwise, nan_to_zero, scratch_directory -from .io import InputDataset, OutputDataset +from .io import InputDataset, MMapArray, OutputDataset __all__ = [ "unwrap", @@ -332,15 +332,13 @@ def unwrap( # type: ignore[no-untyped-def] # copy the input data to it. (`mkstemp` is used to avoid data races in case the # same scratch directory was used for multiple SNAPHU processes.) _, tmp_igram = mkstemp(dir=dir_, prefix="snaphu.igram.", suffix=".c8") - tmp_igram_mmap = np.memmap(tmp_igram, dtype=np.complex64, shape=igram.shape) - copy_blockwise(igram, tmp_igram_mmap, transform=nan_to_zero) - tmp_igram_mmap.flush() + with MMapArray(tmp_igram, shape=igram.shape, dtype=np.complex64) as ifg_mmap: + copy_blockwise(igram, ifg_mmap, transform=nan_to_zero) # Copy the input coherence data to a raw binary file in the scratch directory. _, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4") - tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape) - copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero) - tmp_corr_mmap.flush() + with MMapArray(tmp_corr, shape=corr.shape, dtype=np.float32) as corr_mmap: + copy_blockwise(corr, corr_mmap, transform=nan_to_zero) # If a mask was provided, copy the mask data to a raw binary file in the scratch # directory. @@ -348,9 +346,8 @@ def unwrap( # type: ignore[no-untyped-def] tmp_mask = None else: _, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1") - tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape) - copy_blockwise(mask, tmp_mask_mmap) - tmp_mask_mmap.flush() + with MMapArray(tmp_mask, shape=mask.shape, dtype=np.bool_) as mask_mmap: + copy_blockwise(mask, mask_mmap) # Create files in the scratch directory for SNAPHU outputs. _, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4") @@ -410,9 +407,11 @@ def unwrap( # type: ignore[no-untyped-def] # (i.e. connected component label set to 0). So compute the interferogram # magnitude and pass it as a separate input file. _, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4") - tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=igram.shape) - copy_blockwise(tmp_igram_mmap, tmp_mag_mmap, transform=np.abs) - tmp_mag_mmap.flush() + with ( + MMapArray(tmp_igram, shape=igram.shape, dtype=np.complex64) as ifg_mmap, + MMapArray(tmp_mag, shape=igram.shape, dtype=np.float32) as mag_mmap, + ): + copy_blockwise(ifg_mmap, mag_mmap, transform=np.abs) # Re-run SNAPHU to compute new connected components from the unwrapped phase # as though in single-tile mode, overwriting the original connected @@ -431,11 +430,11 @@ def unwrap( # type: ignore[no-untyped-def] ) # Get the output unwrapped phase data. - tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape) - copy_blockwise(tmp_unw_mmap, unw) + with MMapArray(tmp_unw, shape=unw.shape, dtype=np.float32) as unw_mmap: + copy_blockwise(unw_mmap, unw) # Get the output connected component labels. - tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape) - copy_blockwise(tmp_cc_mmap, conncomp) + with MMapArray(tmp_conncomp, shape=conncomp.shape, dtype=np.uint32) as cc_mmap: + copy_blockwise(cc_mmap, conncomp) return unw, conncomp diff --git a/src/snaphu/io/__init__.py b/src/snaphu/io/__init__.py index 78f81a1..3c9c769 100644 --- a/src/snaphu/io/__init__.py +++ b/src/snaphu/io/__init__.py @@ -1,9 +1,11 @@ from typing import Any from ._dataset import InputDataset, OutputDataset +from ._mmap_array import MMapArray __all__ = [ "InputDataset", + "MMapArray", "OutputDataset", ] diff --git a/src/snaphu/io/__init__.pyi b/src/snaphu/io/__init__.pyi index 525fbf6..8ea91dd 100644 --- a/src/snaphu/io/__init__.pyi +++ b/src/snaphu/io/__init__.pyi @@ -1,8 +1,10 @@ from ._dataset import InputDataset, OutputDataset +from ._mmap_array import MMapArray from ._raster import Raster __all__ = [ "InputDataset", + "MMapArray", "OutputDataset", "Raster", ] diff --git a/src/snaphu/io/_mmap_array.py b/src/snaphu/io/_mmap_array.py new file mode 100644 index 0000000..6aef51e --- /dev/null +++ b/src/snaphu/io/_mmap_array.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +import mmap +import os +from collections.abc import Iterable +from contextlib import AbstractContextManager +from pathlib import Path +from typing import Any + +import numpy as np +from numpy.typing import DTypeLike + +from ._dataset import InputDataset, OutputDataset + +__all__ = [ + "MMapArray", +] + + +# TODO(Python 3.10): change return type to `TypeGuard[Iterable[Any]]` +# (see https://docs.python.org/3/library/typing.html#typing.TypeGuard) +def is_iterable(obj: Any) -> bool: + """ + Check if the input object is iterable. + + Parameters + ---------- + obj : object + The input object. + + Returns + ------- + bool + True if the argument is iterable; otherwise False. + """ + try: + iter(obj) + except TypeError: + return False + else: + return True + + +def tuple_of_ints(ints: int | Iterable[int]) -> tuple[int, ...]: + """ + Convert the input to a tuple of ints. + + Parameters + ---------- + ints : int or iterable of int + One or more integers. + + Returns + ------- + tuple of int + Tuple containing the inputs. + """ + if is_iterable(ints): + return tuple(int(i) for i in ints) + else: + return int(ints), + + +def create_or_extend_file(filepath: str | os.PathLike[str], size: int) -> None: + """ + Create a file with the specified size or extend an existing file to the same size. + + Parameters + ---------- + filepath : str or path-like + File path. + size : int + The size, in bytes, of the file. + """ + filepath = Path(filepath) + + if not filepath.is_file(): + # If the file does not exist, then create it with the specified size. + with filepath.open("wb") as fd: + fd.truncate(size) + else: + # If the file exists but is smaller than the requested size, extend the file + # length. + filesize = filepath.stat().st_size + if filesize < size: + with filepath.open("r+b") as fd: + fd.truncate(size) + + +class MMapArray(InputDataset, OutputDataset, AbstractContextManager["MMapArray"]): + """ + """ + + def __init__( + self, + filepath: str | os.PathLike[str], + shape: tuple[int, ...], + dtype: DTypeLike, + ): + """ + Create a new `MMapArray` object. + + If the file does not exist, it will be created. If the file does exist but is + smaller than the array, it will be extended to the size (in bytes) of the array. + + Parameters + ---------- + filepath : str or path-like + The file path. + shape : tuple of int + Tuple of array dimensions. + dtype : data-type + Data-type of the array's elements. Must be convertible to a `numpy.dtype` + object. + """ + + self._filepath = Path(filepath) + self._shape = tuple_of_ints(shape) + self._dtype = np.dtype(dtype) + + # Get array size in bytes. + size = np.prod(self.shape) * self.dtype.itemsize + + # If the file doesn't exist, create it with the required size. Else, ensure that + # the file size is at least `size` bytes. + create_or_extend_file(self.filepath, size) + + # Open & memory-map the file. + self._fd = self.filepath.open("r+b") + try: + self._mmap = mmap.mmap(self._fd.fileno(), 0, access=mmap.ACCESS_WRITE) + except: + self._fd.close() + raise + + @property + def filepath(self) -> Path: + """pathlib.Path : The file path.""" + return self._filepath + + @property + def shape(self) -> tuple[int, ...]: + """tuple of int : Tuple of array dimensions.""" + return self._shape + + @property + def ndim(self) -> int: + """int : Number of array dimensions.""" + return len(self.shape) + + @property + def dtype(self) -> np.dtype: + """numpy.dtype : Data-type of the array's elements.""" + return self._dtype + + @property + def closed(self) -> bool: + """bool : True if the memory-mapped file is closed.""" + return self._fd.closed + + def flush(self) -> None: + """Flushes changes made to the in-memory copy of a file back to disk.""" + self._mmap.flush() + + def close(self) -> None: + """ + Close the underlying dataset. + + Has no effect if the dataset is already closed. + """ + if self.closed: + return + + self.flush() + + try: + self._mmap.close() + finally: + self._fd.close() + + def __exit__(self, exc_type, exc_value, traceback): # type: ignore[no-untyped-def] + self.close() + + def __array__(self) -> np.ndarray: + return np.frombuffer(self._mmap, dtype=self.dtype).reshape(self.shape) + + def __getitem__(self, key: slice | tuple[slice, ...], /) -> np.ndarray: + arr = np.asanyarray(self) + return arr[key] + + def __setitem__(self, key: slice | tuple[slice, ...], value: np.ndarray, /) -> None: + arr = np.asanyarray(self) + arr[key] = value + + def __repr__(self) -> str: + filepath = self.filepath + shape = self.shape + dtype = self.dtype + return f"{type(self).__name__}({filepath=}, {shape=}, {dtype=})"