Skip to content

Commit

Permalink
Add stitched phase cosine similarity raster to outputs (#446)
Browse files Browse the repository at this point in the history
* Modify `io.process_blocks` to accept overlapping blocks

Result is similar to `map_overlaps`, but with arbitrary number of input or output rasters

* create and pass through similarity raster

* fix existing similarity file search

* use `nan` for nodata, mask pixels with all nan/zero

* fix sim tests
  • Loading branch information
scottstanie authored Oct 12, 2024
1 parent da4a875 commit cb32fc7
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 84 deletions.
64 changes: 51 additions & 13 deletions src/dolphin/io/_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from numpy.typing import ArrayLike
from tqdm.auto import tqdm

from dolphin import HalfWindow, Strides
from dolphin.utils import DummyProcessPoolExecutor

from ._blocks import iter_blocks
from ._blocks import BlockIndices, StridedBlockManager
from ._readers import StackReader
from ._writers import DatasetWriter

Expand All @@ -21,7 +22,10 @@ class BlockProcessor(Protocol):
"""

def __call__(
self, readers: Sequence[StackReader], rows: slice, cols: slice
self,
readers: Sequence[StackReader],
rows: slice,
cols: slice,
) -> tuple[ArrayLike, slice, slice]: ...


Expand All @@ -30,32 +34,66 @@ def process_blocks(
writer: DatasetWriter,
func: BlockProcessor,
block_shape: tuple[int, int] = (512, 512),
overlaps: tuple[int, int] = (0, 0),
num_threads: int = 5,
):
"""Perform block-wise processing over blocks in `readers`, writing to `writer`.
Used to read and process a stack of rasters in parallel, setting up a queue
of results for the `writer` to save.
Parameters
----------
readers : Sequence[StackReader]
Sequence of input readers to read data from.
writer : DatasetWriter
Output writer to write data to.
func : BlockProcessor
Function to process each block.
block_shape : tuple[int, int], optional
Shape of each block to process.
overlaps : tuple[int, int], optional
Amount of overlap between blocks in (row, col) directions.
By default (0, 0).
num_threads : int, optional
Number of threads to use, by default 5.
Note that the parallelism happens using a `ThreadPoolExecutor`, so `func` should
be a function which releases the GIL during computation (e.g. using numpy).
"""
shape = readers[0].shape[-2:]
slices = list(iter_blocks(shape, block_shape=block_shape))
block_manager = StridedBlockManager(
arr_shape=readers[0].shape[-2:],
block_shape=block_shape,
# Here we are not using the striding mechanism
strides=Strides(1, 1),
# The "half window" is how much overlap we read in
half_window=HalfWindow(*overlaps),
)
total_blocks = sum(1 for _ in block_manager.iter_blocks())
pbar = tqdm(total=total_blocks)

pbar = tqdm(total=len(slices))

# Define the callback to write the result to an output DatasetWrite
def write_callback(fut: Future):
data, rows, cols = fut.result()
writer[..., rows, cols] = data
pbar.update()

Executor = ThreadPoolExecutor if num_threads > 1 else DummyProcessPoolExecutor
futures: set[Future] = set()

# Create a helper function to perform the trimming after processing
def _run_and_trim(
func,
readers,
out_idxs: BlockIndices,
trim_idxs: BlockIndices,
in_idxs: BlockIndices,
):
in_rows, in_cols = in_idxs
out_rows, out_cols = out_idxs
trim_rows, trim_cols = trim_idxs
out_data, _, _ = func(readers=readers, rows=in_rows, cols=in_cols)
return out_data[..., trim_rows, trim_cols], out_rows, out_cols

with Executor(num_threads) as exc:
for rows, cols in slices:
future = exc.submit(func, readers=readers, rows=rows, cols=cols)
for out_idxs, trim_idxs, in_idxs, _, _ in block_manager.iter_blocks():
future = exc.submit(
_run_and_trim, func, readers, out_idxs, trim_idxs, in_idxs
)
future.add_done_callback(write_callback)
futures.add(future)

Expand Down
2 changes: 2 additions & 0 deletions src/dolphin/masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def combine_mask_files(
):
"""Combine multiple mask files into a single mask file.
All `mask_files` must be the same size and projected on the same grid.
Parameters
----------
mask_files : list of Path or str
Expand Down
18 changes: 12 additions & 6 deletions src/dolphin/similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def max_similarity(
Returns
-------
np.ndarray
2D array (shape (rows, cols)) of the maximum similarity for any neighrbor
2D array (shape (rows, cols)) of the maximum similarity for any neighbor
at a pixel.
"""
Expand All @@ -102,9 +102,12 @@ def _create_loop_and_run(
raise ValueError("ifg_stack must be complex")

unit_ifgs = np.exp(1j * np.angle(ifg_stack))
out_similarity = np.zeros((rows, cols), dtype="float32")
out_similarity = np.full((rows, cols), fill_value=np.nan, dtype="float32")
if mask is None:
mask = np.ones((rows, cols), dtype="bool")
# Mark any nans/all zeros as invalid
invalid_mask = np.nan_to_num(ifg_stack).sum(axis=0) == 0
mask[invalid_mask] = False

if mask.shape != (rows, cols):
raise ValueError(f"{ifg_stack.shape = }, but {mask.shape = }")
Expand All @@ -125,7 +128,7 @@ def _make_loop_function(
"""

@numba.njit(nogil=True, parallel=True)
def _masked_median_sim_loop(
def _masked_sim_loop(
ifg_stack: np.ndarray,
idxs: np.ndarray,
mask: np.ndarray,
Expand All @@ -145,6 +148,7 @@ def _masked_median_sim_loop(
if not m0:
continue
x0 = ifg_stack[:, r0, c0]

cur_sim_vec = cur_sim[r0, c0]
count = 0

Expand All @@ -169,7 +173,7 @@ def _masked_median_sim_loop(
out_similarity[r0, c0] = summary_func(cur_sim_vec[:count])
return out_similarity

return _masked_median_sim_loop
return _masked_sim_loop


def get_circle_idxs(
Expand Down Expand Up @@ -289,12 +293,12 @@ def create_similarities(
else:
raise ValueError(f"Unrecognized {sim_type = }")

zero_block = np.zeros(block_shape, dtype="float32")
nodata_block = np.full(block_shape, fill_value=np.nan, dtype="float32")

def calc_sim(readers, rows, cols):
block = readers[0][:, rows, cols]
if np.sum(block) == 0 or np.isnan(block).all():
return zero_block[rows, cols], rows, cols
return nodata_block[rows, cols], rows, cols

out_avg = sim_function(ifg_stack=block, search_radius=search_radius)
logger.debug(f"{rows = }, {cols = }, {block.shape = }, {out_avg.shape = }")
Expand All @@ -312,12 +316,14 @@ def calc_sim(readers, rows, cols):
like_filename=ifg_file_list[0],
dtype="float32",
driver="GTiff",
nodata=np.nan,
)
process_blocks(
[reader],
writer,
func=calc_sim,
block_shape=block_shape,
overlaps=(search_radius, search_radius),
num_threads=num_threads,
)
writer.notify_finished()
Expand Down
19 changes: 16 additions & 3 deletions src/dolphin/workflows/displacement.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class OutputPaths(NamedTuple):
stitched_ps_file: Path
stitched_amp_dispersion_file: Path
stitched_shp_count_file: Path
stitched_similarity_file: Path
unwrapped_paths: list[Path] | None
conncomp_paths: list[Path] | None
timeseries_paths: list[Path] | None
Expand Down Expand Up @@ -128,6 +129,7 @@ def run(
ps_file_list: list[Path] = []
amp_dispersion_file_list: list[Path] = []
shp_count_file_list: list[Path] = []
similarity_file_list: list[Path] = []
# The comp_slc tracking object is a dict, since we'll need to organize
# multiple comp slcs by burst (they'll have the same filename)
comp_slc_dict: dict[str, list[Path]] = {}
Expand Down Expand Up @@ -161,15 +163,22 @@ def run(
for fut in fut_to_burst:
burst = fut_to_burst[fut]

cur_ifg_list, comp_slcs, temp_coh, ps_file, amp_disp_file, shp_count = (
fut.result()
)
(
cur_ifg_list,
comp_slcs,
temp_coh,
ps_file,
amp_disp_file,
shp_count,
similarity,
) = fut.result()
ifg_file_list.extend(cur_ifg_list)
comp_slc_dict[burst] = comp_slcs
temp_coh_file_list.append(temp_coh)
ps_file_list.append(ps_file)
amp_dispersion_file_list.append(amp_disp_file)
shp_count_file_list.append(shp_count)
similarity_file_list.append(similarity)

# ###################################
# 2. Stitch burst-wise interferograms
Expand All @@ -186,12 +195,14 @@ def run(
stitched_ps_file,
stitched_amp_dispersion_file,
stitched_shp_count_file,
stitched_similarity_file,
) = stitching_bursts.run(
ifg_file_list=ifg_file_list,
temp_coh_file_list=temp_coh_file_list,
ps_file_list=ps_file_list,
amp_dispersion_list=amp_dispersion_file_list,
shp_count_file_list=shp_count_file_list,
similarity_file_list=similarity_file_list,
stitched_ifg_dir=cfg.interferogram_network._directory,
output_options=cfg.output_options,
file_date_fmt=cfg.input_options.cslc_date_fmt,
Expand All @@ -212,6 +223,7 @@ def run(
stitched_ps_file=stitched_ps_file,
stitched_amp_dispersion_file=stitched_amp_dispersion_file,
stitched_shp_count_file=stitched_shp_count_file,
stitched_similarity_file=stitched_similarity_file,
unwrapped_paths=None,
conncomp_paths=None,
timeseries_paths=None,
Expand Down Expand Up @@ -354,6 +366,7 @@ def run(
stitched_ps_file=stitched_ps_file,
stitched_amp_dispersion_file=stitched_amp_dispersion_file,
stitched_shp_count_file=stitched_shp_count_file,
stitched_similarity_file=stitched_similarity_file,
unwrapped_paths=unwrapped_paths,
# TODO: Let's keep the unwrapped_paths since all the outputs are
# corresponding to those and if we have a network unwrapping, the
Expand Down
23 changes: 21 additions & 2 deletions src/dolphin/workflows/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dolphin import io
from dolphin._types import Filename
from dolphin.io import VRTStack
from dolphin.similarity import create_similarities
from dolphin.stack import MiniStackPlanner

from .config import ShpMethod
Expand Down Expand Up @@ -51,7 +52,7 @@ def run_wrapped_phase_sequential(
block_shape: tuple[int, int] = (512, 512),
baseline_lag: Optional[int] = None,
**tqdm_kwargs,
) -> tuple[list[Path], list[Path], Path, Path]:
) -> tuple[list[Path], list[Path], Path, Path, Path]:
"""Estimate wrapped phase using batches of ministacks."""
if strides is None:
strides = {"x": 1, "y": 1}
Expand Down Expand Up @@ -149,6 +150,18 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool:
_average_rasters(temp_coh_files, output_temp_coh_file, "Float32")
_average_rasters(shp_count_files, output_shp_count_file, "Int16")

# Create one phase similarity raster
output_similarity_file = output_folder / f"similarity_{full_span}.tif"
create_similarities(
ifg_file_list=cur_output_files,
output_file=output_similarity_file,
# TODO: any of these configurable?
search_radius=11,
sim_type="median",
block_shape=block_shape,
num_threads=3,
)

# Combine the separate SLC output lists into a single list
all_slc_files = list(chain.from_iterable(output_slc_files))
all_comp_slc_files = [ms.get_compressed_slc_info().path for ms in ministacks]
Expand All @@ -163,7 +176,13 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool:
p.rename(output_folder / p.name)
comp_slc_outputs.append(output_folder / p.name)

return out_pl_slcs, comp_slc_outputs, output_temp_coh_file, output_shp_count_file
return (
out_pl_slcs,
comp_slc_outputs,
output_temp_coh_file,
output_shp_count_file,
output_similarity_file,
)


def _get_outputs_from_folder(
Expand Down
Loading

0 comments on commit cb32fc7

Please sign in to comment.