Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure memory-mapped files are safely closed #81

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions src/snaphu/_conncomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from ._snaphu import run_snaphu
from ._util import copy_blockwise, nan_to_zero, scratch_directory
from .io import InputDataset, OutputDataset
from .io import InputDataset, MMapArray, OutputDataset

__all__ = [
"grow_conncomps",
Expand Down Expand Up @@ -274,35 +274,31 @@ def grow_conncomps( # type: ignore[no-untyped-def]
# copy the input data to it. (`mkstemp` is used to avoid data races in case the
# same scratch directory was used for multiple SNAPHU processes.)
_, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape)
copy_blockwise(unw, tmp_unw_mmap, transform=nan_to_zero)
tmp_unw_mmap.flush()
with MMapArray(tmp_unw, shape=unw.shape, dtype=np.float32) as unw_mmap:
copy_blockwise(unw, unw_mmap, transform=nan_to_zero)

# Copy the input coherence data to a raw binary file in the scratch directory.
_, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape)
copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero)
tmp_corr_mmap.flush()
with MMapArray(tmp_corr, shape=corr.shape, dtype=np.float32) as corr_mmap:
copy_blockwise(corr, corr_mmap, transform=nan_to_zero)

# If magnitude data was provided, copy it to a raw binary file in the scratch
# directory.
if mag is None:
tmp_mag = None
else:
_, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=mag.shape)
copy_blockwise(mag, tmp_mag_mmap, transform=nan_to_zero)
tmp_mag_mmap.flush()
with MMapArray(tmp_mag, shape=mag.shape, dtype=np.float32) as mag_mmap:
copy_blockwise(mag, mag_mmap, transform=nan_to_zero)

# If a mask was provided, copy the mask data to a raw binary file in the scratch
# directory.
if mask is None:
tmp_mask = None
else:
_, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape)
copy_blockwise(mask, tmp_mask_mmap)
tmp_mask_mmap.flush()
with MMapArray(tmp_mask, shape=mask.shape, dtype=np.bool_) as mask_mmap:
copy_blockwise(mask, mask_mmap)

# Create a raw file in the scratch directory for the output connected
# components.
Expand All @@ -322,7 +318,7 @@ def grow_conncomps( # type: ignore[no-untyped-def]
)

# Get the output connected component labels.
tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape)
copy_blockwise(tmp_cc_mmap, conncomp)
with MMapArray(tmp_conncomp, shape=conncomp.shape, dtype=np.uint32) as cc_mmap:
copy_blockwise(cc_mmap, conncomp)

return conncomp
33 changes: 16 additions & 17 deletions src/snaphu/_unwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ._conncomp import regrow_conncomp_from_unw
from ._snaphu import run_snaphu
from ._util import copy_blockwise, nan_to_zero, scratch_directory
from .io import InputDataset, OutputDataset
from .io import InputDataset, MMapArray, OutputDataset

__all__ = [
"unwrap",
Expand Down Expand Up @@ -332,25 +332,22 @@
# copy the input data to it. (`mkstemp` is used to avoid data races in case the
# same scratch directory was used for multiple SNAPHU processes.)
_, tmp_igram = mkstemp(dir=dir_, prefix="snaphu.igram.", suffix=".c8")
tmp_igram_mmap = np.memmap(tmp_igram, dtype=np.complex64, shape=igram.shape)
copy_blockwise(igram, tmp_igram_mmap, transform=nan_to_zero)
tmp_igram_mmap.flush()
with MMapArray(tmp_igram, shape=igram.shape, dtype=np.complex64) as ifg_mmap:
copy_blockwise(igram, ifg_mmap, transform=nan_to_zero)

# Copy the input coherence data to a raw binary file in the scratch directory.
_, tmp_corr = mkstemp(dir=dir_, prefix="snaphu.corr.", suffix=".f4")
tmp_corr_mmap = np.memmap(tmp_corr, dtype=np.float32, shape=corr.shape)
copy_blockwise(corr, tmp_corr_mmap, transform=nan_to_zero)
tmp_corr_mmap.flush()
with MMapArray(tmp_corr, shape=corr.shape, dtype=np.float32) as corr_mmap:
copy_blockwise(corr, corr_mmap, transform=nan_to_zero)

# If a mask was provided, copy the mask data to a raw binary file in the scratch
# directory.
if mask is None:
tmp_mask = None
else:
_, tmp_mask = mkstemp(dir=dir_, prefix="snaphu.mask.", suffix=".u1")
tmp_mask_mmap = np.memmap(tmp_mask, dtype=np.bool_, shape=mask.shape)
copy_blockwise(mask, tmp_mask_mmap)
tmp_mask_mmap.flush()
with MMapArray(tmp_mask, shape=mask.shape, dtype=np.bool_) as mask_mmap:
copy_blockwise(mask, mask_mmap)

# Create files in the scratch directory for SNAPHU outputs.
_, tmp_unw = mkstemp(dir=dir_, prefix="snaphu.unw.", suffix=".f4")
Expand Down Expand Up @@ -410,9 +407,11 @@
# (i.e. connected component label set to 0). So compute the interferogram
# magnitude and pass it as a separate input file.
_, tmp_mag = mkstemp(dir=dir_, prefix="snaphu.mag.", suffix=".f4")
tmp_mag_mmap = np.memmap(tmp_mag, dtype=np.float32, shape=igram.shape)
copy_blockwise(tmp_igram_mmap, tmp_mag_mmap, transform=np.abs)
tmp_mag_mmap.flush()
with (

Check warning on line 410 in src/snaphu/_unwrap.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/_unwrap.py#L410

Added line #L410 was not covered by tests
MMapArray(tmp_igram, shape=igram.shape, dtype=np.complex64) as ifg_mmap,
MMapArray(tmp_mag, shape=igram.shape, dtype=np.float32) as mag_mmap,
):
copy_blockwise(ifg_mmap, mag_mmap, transform=np.abs)

Check warning on line 414 in src/snaphu/_unwrap.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/_unwrap.py#L414

Added line #L414 was not covered by tests

# Re-run SNAPHU to compute new connected components from the unwrapped phase
# as though in single-tile mode, overwriting the original connected
Expand All @@ -431,11 +430,11 @@
)

# Get the output unwrapped phase data.
tmp_unw_mmap = np.memmap(tmp_unw, dtype=np.float32, shape=unw.shape)
copy_blockwise(tmp_unw_mmap, unw)
with MMapArray(tmp_unw, shape=unw.shape, dtype=np.float32) as unw_mmap:
copy_blockwise(unw_mmap, unw)

# Get the output connected component labels.
tmp_cc_mmap = np.memmap(tmp_conncomp, dtype=np.uint32, shape=conncomp.shape)
copy_blockwise(tmp_cc_mmap, conncomp)
with MMapArray(tmp_conncomp, shape=conncomp.shape, dtype=np.uint32) as cc_mmap:
copy_blockwise(cc_mmap, conncomp)

return unw, conncomp
2 changes: 2 additions & 0 deletions src/snaphu/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Any

from ._dataset import InputDataset, OutputDataset
from ._mmap_array import MMapArray

__all__ = [
"InputDataset",
"MMapArray",
"OutputDataset",
]

Expand Down
2 changes: 2 additions & 0 deletions src/snaphu/io/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from ._dataset import InputDataset, OutputDataset
from ._mmap_array import MMapArray
from ._raster import Raster

__all__ = [
"InputDataset",
"MMapArray",
"OutputDataset",
"Raster",
]
199 changes: 199 additions & 0 deletions src/snaphu/io/_mmap_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from __future__ import annotations

import mmap
import os
from collections.abc import Iterable
from contextlib import AbstractContextManager
from pathlib import Path
from typing import Any

import numpy as np
from numpy.typing import DTypeLike

from ._dataset import InputDataset, OutputDataset

__all__ = [
"MMapArray",
]


# TODO(Python 3.10): change return type to `TypeGuard[Iterable[Any]]`
# (see https://docs.python.org/3/library/typing.html#typing.TypeGuard)
def is_iterable(obj: Any) -> bool:
"""
Check if the input object is iterable.

Parameters
----------
obj : object
The input object.

Returns
-------
bool
True if the argument is iterable; otherwise False.
"""
try:
iter(obj)
except TypeError:
return False

Check warning on line 39 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L38-L39

Added lines #L38 - L39 were not covered by tests
else:
return True


def tuple_of_ints(ints: int | Iterable[int]) -> tuple[int, ...]:
"""
Convert the input to a tuple of ints.

Parameters
----------
ints : int or iterable of int
One or more integers.

Returns
-------
tuple of int
Tuple containing the inputs.
"""
if is_iterable(ints):
return tuple(int(i) for i in ints)
else:
return int(ints),

Check warning on line 61 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L61

Added line #L61 was not covered by tests


def create_or_extend_file(filepath: str | os.PathLike[str], size: int) -> None:
"""
Create a file with the specified size or extend an existing file to the same size.

Parameters
----------
filepath : str or path-like
File path.
size : int
The size, in bytes, of the file.
"""
filepath = Path(filepath)

if not filepath.is_file():
# If the file does not exist, then create it with the specified size.
with filepath.open("wb") as fd:
fd.truncate(size)

Check warning on line 80 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L79-L80

Added lines #L79 - L80 were not covered by tests
else:
# If the file exists but is smaller than the requested size, extend the file
# length.
filesize = filepath.stat().st_size
if filesize < size:
with filepath.open("r+b") as fd:
fd.truncate(size)


class MMapArray(InputDataset, OutputDataset, AbstractContextManager["MMapArray"]):
"""
"""

def __init__(
self,
filepath: str | os.PathLike[str],
shape: tuple[int, ...],
dtype: DTypeLike,
):
"""
Create a new `MMapArray` object.

If the file does not exist, it will be created. If the file does exist but is
smaller than the array, it will be extended to the size (in bytes) of the array.

Parameters
----------
filepath : str or path-like
The file path.
shape : tuple of int
Tuple of array dimensions.
dtype : data-type
Data-type of the array's elements. Must be convertible to a `numpy.dtype`
object.
"""

self._filepath = Path(filepath)
self._shape = tuple_of_ints(shape)
self._dtype = np.dtype(dtype)

# Get array size in bytes.
size = np.prod(self.shape) * self.dtype.itemsize

# If the file doesn't exist, create it with the required size. Else, ensure that
# the file size is at least `size` bytes.
create_or_extend_file(self.filepath, size)

# Open & memory-map the file.
self._fd = self.filepath.open("r+b")
try:
self._mmap = mmap.mmap(self._fd.fileno(), 0, access=mmap.ACCESS_WRITE)
except:
self._fd.close()
raise

Check warning on line 134 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L132-L134

Added lines #L132 - L134 were not covered by tests

@property
def filepath(self) -> Path:
"""pathlib.Path : The file path."""
return self._filepath

@property
def shape(self) -> tuple[int, ...]:
"""tuple of int : Tuple of array dimensions."""
return self._shape

@property
def ndim(self) -> int:
"""int : Number of array dimensions."""
return len(self.shape)

Check warning on line 149 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L149

Added line #L149 was not covered by tests

@property
def dtype(self) -> np.dtype:
"""numpy.dtype : Data-type of the array's elements."""
return self._dtype

@property
def closed(self) -> bool:
"""bool : True if the memory-mapped file is closed."""
return self._fd.closed

def flush(self) -> None:
"""Flushes changes made to the in-memory copy of a file back to disk."""
self._mmap.flush()

def close(self) -> None:
"""
Close the underlying dataset.

Has no effect if the dataset is already closed.
"""
if self.closed:
return

Check warning on line 172 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L172

Added line #L172 was not covered by tests

self.flush()

try:
self._mmap.close()
finally:
self._fd.close()

def __exit__(self, exc_type, exc_value, traceback): # type: ignore[no-untyped-def]
self.close()

def __array__(self) -> np.ndarray:
return np.frombuffer(self._mmap, dtype=self.dtype).reshape(self.shape)

def __getitem__(self, key: slice | tuple[slice, ...], /) -> np.ndarray:
arr = np.asanyarray(self)
return arr[key]

def __setitem__(self, key: slice | tuple[slice, ...], value: np.ndarray, /) -> None:
arr = np.asanyarray(self)
arr[key] = value

def __repr__(self) -> str:
filepath = self.filepath
shape = self.shape
dtype = self.dtype
return f"{type(self).__name__}({filepath=}, {shape=}, {dtype=})"

Check warning on line 199 in src/snaphu/io/_mmap_array.py

View check run for this annotation

Codecov / codecov/patch

src/snaphu/io/_mmap_array.py#L196-L199

Added lines #L196 - L199 were not covered by tests