Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add decimator and gaussian filter #208

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 227 additions & 1 deletion src/eddymotion/data/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@

from __future__ import annotations

from numbers import Number

import numpy as np
from scipy.ndimage import median_filter
from nibabel import Nifti1Image, load
from nibabel.affines import apply_affine, voxel_sizes
from scipy.ndimage import gaussian_filter as _gs
from scipy.ndimage import map_coordinates, median_filter
from skimage.morphology import ball

DEFAULT_DTYPE = "int16"
Expand Down Expand Up @@ -92,3 +97,224 @@ def advanced_clip(
data = np.round(255 * data).astype(dtype)

return data


def gaussian_filter(
data: np.ndarray,
vox_width: float | tuple[float, float, float],
) -> np.ndarray:
"""
Applies a Gaussian smoothing filter to a n-dimensional array.

This function smooths the input data using a Gaussian filter with a specified
width (sigma) in voxels along each relevant dimension. It automatically
handles different data dimensionalities (2D, 3D, or 4D) and ensures that
smoothing is not applied along the time or orientation dimension (if present
in 4D data).

Parameters
----------
data : :obj:`~numpy.ndarray`
The input data array.
vox_width : :obj:`float` or :obj:`tuple` of three :obj:`float`
The smoothing kernel width (sigma) in voxels. If a single :obj:`float` is provided,
it is applied uniformly across all spatial dimensions. Alternatively, a
tuple of three floats can be provided to specify different sigma values
for each spatial dimension (x, y, z).

Returns
-------
:obj:`~numpy.ndarray`
The smoothed data array.

"""

data = np.squeeze(data) # Drop unused dimensions
ndim = data.ndim

if isinstance(vox_width, Number):
vox_width = tuple([vox_width] * min(3, ndim))

# Do not smooth across time/orientation (if present in 4D data)
if ndim == 4 and len(vox_width) == 3:
vox_width = (*vox_width, 0)

return _gs(data, vox_width)


def downsample(
in_file: str,
shape: tuple[int, int, int],
smooth: bool | tuple[int, int, int] = True,
order: int = 3,
nonnegative: bool = True,
) -> Nifti1Image:
"""
Downsamples a 3D or 4D Nifti image by a specified downsampling factor.

This function downsamples a Nifti image by averaging voxels within a user-defined
factor in each spatial dimension. It optionally applies Gaussian smoothing
before downsampling to reduce aliasing artifacts. The function also handles
updating the affine transformation matrix to reflect the change in voxel size.

Parameters
----------
in_file : :obj:`str`
Path to the input NIfTI image file.
factor : :obj:`int` or :obj:`tuple`
The downsampling factor. If a single integer is provided, it is applied
uniformly across all spatial dimensions. Alternatively, a tuple of three
integers can be provided to specify different downsampling factors for each
spatial dimension (x, y, z). Values must be greater than 0.
smooth : :obj:`bool` or :obj:`tuple`, optional (default=``True``)
Controls application of Gaussian smoothing before downsampling. If True,
a smoothing kernel size equal to the downsampling factor is applied.
Alternatively, a tuple of three integers can be provided to specify
different smoothing kernel sizes for each spatial dimension. Setting to
False disables smoothing.
order : :obj:`int`, optional (default=3)
The order of the spline interpolation used for downsampling. Higher
orders provide smoother results but are computationally more expensive.
nonnegative : :obj:`bool`, optional (default=``True``)
If True, negative values in the downsampled data are set to zero.

Returns
-------
:obj:`~nibabel.Nifti1Image`
The downsampled NIfTI image object.

"""

imnii = load(in_file)
data = np.squeeze(imnii.get_fdata()) # Remove unused dimensions
datashape = np.array(data.shape)
shape = np.array(shape)
ndim = data.ndim

if smooth:
if smooth is True:
smooth = datashape[:3] / shape[:3]
data = gaussian_filter(data, smooth)

extents = np.abs(
apply_affine(imnii.affine, datashape - 1)
- apply_affine(imnii.affine, (0.0, 0.0, 0.0))
)
newzooms = extents / shape

# Update affine transformation
newaffine = np.eye(4)
oldzooms = voxel_sizes(imnii.affine)
newaffine[:3, :3] = np.diag(newzooms / oldzooms) @ imnii.affine[:3, :3]

# Update offset so new array is aligned with original
newaffine[:3, 3] = (
apply_affine(imnii.affine, 0.5 * datashape)
- apply_affine(newaffine, 0.5 * shape)
)

xfm = np.linalg.inv(imnii.affine) @ newaffine

# Create downsampled grid
down_grid = np.array(
np.meshgrid(
*[np.arange(_s, step=1) for _s in shape],
indexing="ij",
)
)

# Locations is an Nx3 array of index coordinates of the original image where we sample
locations = apply_affine(xfm, down_grid.reshape((ndim, np.prod(shape))).T)

# Resample data on the new grid
resampled = map_coordinates(
data,
locations.T,
order=order,
mode="mirror",
prefilter=True,
).reshape(shape)

# Set negative values to zero (optional)
if order > 2 and nonnegative:
resampled[resampled < 0] = 0

# Create new Nifti image with updated information
newnii = Nifti1Image(resampled, newaffine, imnii.header)
newnii.set_sform(newaffine, code=1)
newnii.set_qform(newaffine, code=1)

return newnii


def decimate(
in_file: str,
factor: int | tuple[int, int, int],
smooth: bool | tuple[int, int, int] = True,
nonnegative: bool = True,
) -> Nifti1Image:
"""
Decimates a 3D or 4D Nifti image by a specified downsampling factor.

This function downsamples a Nifti image by averaging voxels within a user-defined
factor in each spatial dimension. It optionally applies Gaussian smoothing
before downsampling to reduce aliasing artifacts. The function also handles
updating the affine transformation matrix to reflect the change in voxel size.

Parameters
----------
in_file : :obj:`str`
Path to the input NIfTI image file.
factor : :obj:`int` or :obj:`tuple`
The downsampling factor. If a single integer is provided, it is applied
uniformly across all spatial dimensions. Alternatively, a tuple of three
integers can be provided to specify different downsampling factors for each
spatial dimension (x, y, z). Values must be greater than 0.
smooth : :obj:`bool` or :obj:`tuple`, optional (default=``True``)
Controls application of Gaussian smoothing before downsampling. If True,
a smoothing kernel size equal to the downsampling factor is applied.
Alternatively, a tuple of three integers can be provided to specify
different smoothing kernel sizes for each spatial dimension. Setting to
False disables smoothing.
nonnegative : :obj:`bool`, optional (default=``True``)
If True, negative values in the downsampled data are set to zero.

Returns
-------
:obj:`~nibabel.Nifti1Image`
The downsampled NIfTI image object.

"""

imnii = load(in_file)
data = np.squeeze(imnii.get_fdata()) # Remove unused dimensions
ndim = data.ndim

if isinstance(factor, Number):
factor = tuple([factor] * min(3, ndim))

if any(f <= 0 for f in factor[:3]):
raise ValueError("All spatial downsampling factors must be positive.")

if ndim == 4 and len(factor) == 3:
factor = (*factor, 0)

if smooth:
if smooth is True:
smooth = factor
data = gaussian_filter(data, smooth)

# Update affine transformation
newaffine = imnii.affine.copy()
newaffine[:3, :3] = np.array(factor[:3]) * newaffine[:3, :3]

# Create new Nifti image with updated information
newnii = Nifti1Image(
data[::factor[0], ::factor[1], ::factor[2]],
newaffine,
imnii.header,
)
newnii.set_sform(newaffine, code=1)
newnii.set_qform(newaffine, code=1)

return newnii
146 changes: 146 additions & 0 deletions test/test_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2024 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Unit tests exercising data filtering utilities."""
import nibabel as nb
import numpy as np

import pytest

from eddymotion.data.filtering import decimate, downsample


@pytest.mark.parametrize(
("size", "block_size"),
[
((20, 20, 20), (5, 5, 5),),
((21, 21, 21), (5, 5, 5),),
],
)
@pytest.mark.parametrize(
("zoom_x", ),
[(1.0, ), (-1.0, ), (2.0, ), (-2.0, )],
)
@pytest.mark.parametrize(
("zoom_y", ),
[(1.0, ), (-1.0, ), (2.0, ), (-2.0, )],
)
@pytest.mark.parametrize(
("zoom_z", ),
[(1.0, ), (-1.0, ), (2.0, ), (-2.0, )],
)
@pytest.mark.parametrize(
("angle_x", ),
[(0.0, ), (0.2, ), (-0.05, )],
)
@pytest.mark.parametrize(
("angle_y", ),
[(0.0, ), (0.2, ), (-0.05, )],
)
@pytest.mark.parametrize(
("angle_z", ),
[(0.0, ), (0.2, ), (-0.05, )],
)
@pytest.mark.parametrize(
("offsets", ),
[
(None, ),
((0.0, 0.0, 0.0),),
],
)
def test_decimation(
tmp_path,
size,
block_size,
zoom_x,
zoom_y,
zoom_z,
angle_x,
angle_y,
angle_z,
offsets,
outdir,
):
"""Exercise decimation."""

# Calculate the number of sub-blocks in each dimension
num_blocks = [s // b for s, b in zip(size, block_size)]

# Create the empty array
voxel_array = np.zeros(size, dtype=np.uint16)

# Fill the array with increasing values based on sub-block position
current_block = 0
for k in range(num_blocks[2]):
for j in range(num_blocks[1]):
for i in range(num_blocks[0]):
voxel_array[
i * block_size[0]:(i + 1) * block_size[0],
j * block_size[1]:(j + 1) * block_size[1],
k * block_size[2]:(k + 1) * block_size[2]
] = current_block
current_block += 1

fname = tmp_path / "test_img.nii.gz"

affine = np.eye(4)
affine[:3, :3] = (
nb.eulerangles.euler2mat(x=angle_x, y=angle_y, z=angle_z)
@ np.diag((zoom_x, zoom_y, zoom_z))
@ affine[:3, :3]
)

if offsets is None:
affine[:3, 3] = -0.5 * nb.affines.apply_affine(affine, np.array(size) - 1)

test_image = nb.Nifti1Image(voxel_array.astype(np.uint16), affine, None)
test_image.header.set_data_dtype(np.uint16)
test_image.to_filename(fname)

# Need to define test oracle. For now, just see if it doesn't smoke.
out = decimate(fname, factor=2, smooth=False)

out = downsample(fname, shape=(10, 10, 10), smooth=False, order=0)

if outdir:
from niworkflows.interfaces.reportlets.registration import (
SimpleBeforeAfterRPT as SimpleBeforeAfter,
)

out.to_filename(tmp_path / "decimated.nii.gz")

SimpleBeforeAfter(
after_label="Decimated",
before_label="Original",
after=str(tmp_path / "decimated.nii.gz"),
before=str(fname),
out_report=str(outdir / f'decimated-{tmp_path.name}.svg'),
).run()

out.to_filename(tmp_path / "downsampled.nii.gz")
SimpleBeforeAfter(
after_label="Downsampled",
before_label="Original",
after=str(tmp_path / "downsampled.nii.gz"),
before=str(fname),
out_report=str(outdir / f'downsampled-{tmp_path.name}.svg'),
).run()
Loading