Skip to content

Commit

Permalink
Added HyperhemisphericalGridFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianPfaff committed Jan 19, 2024
1 parent f7ff205 commit 6c9d859
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
83 changes: 83 additions & 0 deletions pyrecest/filters/hyperhemispherical_grid_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import numpy as np
import warnings
from .abstract_grid_filter import AbstractGridFilter
from .abstract_hyperhemispherical_filter import AbstractHyperhemisphericalFilter
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution
from pyrecest.distributions import BinghamDistribution
from pyrecest.distributions import WatsonDistribution
from pyrecest.distributions import VonMisesFisherDistribution
from pyrecest.distributions import HypersphericalMixture
from pyrecest.distributions import HyperhemisphericalUniformDistribution
from pyrecest.distributions import AbstractHyperhemisphericalDistribution
from pyrecest.distributions import HyperhemisphericalWatsonDistribution

class HyperhemisphericalGridFilter(AbstractGridFilter, AbstractHyperhemisphericalFilter):
def __init__(self, no_of_coefficients, dim, grid_type='eq_point_set_symm'):
self.gd = HyperhemisphericalGridDistribution.from_distribution(
HyperhemisphericalUniformDistribution(dim), no_of_coefficients, grid_type)

def set_state(self, new_state):
assert self.dim == new_state.dim
assert isinstance(new_state, AbstractHyperhemisphericalDistribution)
self.gd = new_state

def predict_identity(self, d_sys):
assert isinstance(d_sys, AbstractHyperhemisphericalDistribution)
sd_half_cond_sd_half = HyperhemisphericalGridFilter.sys_noise_to_transition_density(
d_sys, self.gd.grid_values.shape[0])
self.predict_nonlinear_via_transition_density(sd_half_cond_sd_half)

def update_identity(self, meas_noise, z=None):
assert isinstance(meas_noise, AbstractHyperhemisphericalDistribution)
if not z==None:
measNoise = measNoise.setMode(z)
curr_grid = self.gd.get_grid()
self.gd = self.gd.multiply(HyperhemisphericalGridDistribution(curr_grid, 2 * meas_noise.pdf(curr_grid).T))

def update_nonlinear(self, likelihood, z):
self.gd.grid_values = self.gd.grid_values * likelihood(z, self.gd.get_grid()).T
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
self.gd = self.gd.normalize()

def predict_nonlinear_via_transition_density(self, f_trans):
assert np.array_equal(self.gd.get_grid(), f_trans.get_grid()), \
"fTrans is using an incompatible grid."
self.gd = self.gd.normalize()
grid_values_new = self.gd.get_manifold_size() / self.gd.grid_values.shape[0] * f_trans.grid_values.dot(
self.gd.grid_values)
self.gd = HyperhemisphericalGridDistribution(self.gd.get_grid(), grid_values_new)

def get_point_estimate(self):
gd_full_sphere = self.gd.to_full_sphere()
p = BinghamDistribution.fit(gd_full_sphere.get_grid(), gd_full_sphere.grid_values.T / np.sum(
gd_full_sphere.grid_values)).mode()
if p[-1] < 0:
p = -p
return p

@staticmethod
def sys_noise_to_transition_density(d_sys, no_grid_points):
if isinstance(d_sys, AbstractDistribution):
if isinstance(d_sys, (HyperhemisphericalWatsonDistribution, WatsonDistribution)):
def trans(xkk, xk):
return np.array([2 * WatsonDistribution(xk[:, i], d_sys.kappa).pdf(xkk) for i in range(xk.shape[1])]).T

elif (isinstance(d_sys, HypersphericalMixture) and len(d_sys.dists) == 2 and
np.all(d_sys.w == 0.5) and np.array_equal(d_sys.dists[0].mu, -d_sys.dists[1].mu) and
d_sys.dists[0].kappa == d_sys.dists[1].kappa):
def trans(xkk, xk):
return np.array([(VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(xkk) +
VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(-xkk))
for i in range(xk.shape[1])]).T
else:
raise ValueError("Distribution not supported for predict identity. Must be zonal (rotationally symmetric around last dimension)")

print("PredictIdentity:Inefficient - Using inefficient prediction. Consider precalculating the SdHalfCondSdHalfGridDistribution and using predictNonlinearViaTransitionDensity.")
sd_half_cond_sd_half = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, 'eq_point_set_symm', 2 * d_sys.dim)
return sd_half_cond_sd_half

else:
raise TypeError("d_sys must be an instance of AbstractDistribution")

96 changes: 96 additions & 0 deletions pyrecest/tests/filters/test_hyperhemispherical_grid_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import unittest
import numpy as np
from pyrecest.filters.hyperhemispherical_grid_filter import HyperhemisphericalGridFilter
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution
from pyrecest.distributions import BinghamDistribution, HypersphericalMixture, VonMisesFisherDistribution

class HyperhemisphericalGridFilterTest(unittest.TestCase):
def test_set_state_s2(self):
np.random.seed(0)
no_grid_points = 1001
sgf = HyperhemisphericalGridFilter(no_grid_points, 3)

self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1))

# Test if it is uniform at the beginning
self.assertAlmostEqual(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size() * np.ones((no_grid_points, 1))))), 0, delta=1e-13)

M = np.eye(3)
Z = np.array([-2, -1, 0]).reshape(-1, 1)
bd = BinghamDistribution(Z, M)
bd.F = bd.F * bd.integrate_numerically()

sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
sgf.set_state(sgd_state)
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)

# Verify that it is no longer a uniform distribution
self.assertGreater(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size()))), 60)

# Verify estimate matches a mode of the Bingham
self.assertAlmostEqual(np.min(np.linalg.norm(sgf.get_point_estimate() - np.hstack((bd.mode(), -bd.mode())), axis=0)), 0, delta=1e-11)

# Verify errors
full_grid = sgd_state.get_grid()
sgd_state.grid = full_grid[:, -1]
sgd_state.grid_values = sgd_state.grid_values[:-1]
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
sgf_tmp = sgf.copy()

with self.assertRaises(ValueError):
sgf_tmp.set_state(sgd_state)

with self.assertRaises(ValueError):
sgf.set_state(bd)

def test_set_state_s3(self):
no_grid_points = 1001
sgf = HyperhemisphericalGridFilter(no_grid_points, 4)
self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1))

# Test if it is uniform at the beginning
self.assertAlmostEqual(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 0)

M = np.eye(4)
Z = np.array([-2, -1, -0.5, 0]).reshape(-1, 1)
bd = BinghamDistribution(Z, M)
bd.F = bd.F * bd.integrate_numerically()

sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)
sgf.set_state(sgd_state)

# Verify that it is no longer a uniform distribution
self.assertGreater(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 5)
def test_predict_converges_to_uniform_S2_S3(self):
test_predict_converges_to_uniform(3, 501, [-2, -1, 0], 3, 5e-5, 'eq_point_set_symm', 6)
test_predict_converges_to_uniform(4, 1001, [-2, -1, -0.5, 0], 5, 1e-3, 'eq_point_set', 8)
def test_predict_converges_to_uniform(dim, no_grid_points, z_values, tol_verify_greater, tol_verify_equal, eq_point_set_type, eq_point_set_arg):
sgf = HyperhemisphericalGridFilter(no_grid_points, dim)
M = np.eye(dim)
Z = np.array(z_values).reshape(-1, 1)
bd = BinghamDistribution(Z, M)
bd.F = bd.F * bd.integrate_numerically()
sgf.set_state(HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points))

# Verify that it is not a uniform distribution
assert sum(abs(np.diff(sgf.get_estimate().grid_values.T))) > tol_verify_greater

# Predict 10 times with VM-distributed noise
def trans(xkk, xk):
return 2 * np.hstack([HypersphericalMixture([VonMisesFisherDistribution(xk[:, i], 1), VonMisesFisherDistribution(-xk[:, i], 1)], [0.5, 0.5]).pdf(xkk) for i in range(xk.shape[1])])

sdsd = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, eq_point_set_type, eq_point_set_arg)
manifold_size = sgf.get_estimate().get_manifold_size()

for i in range(10):
values_alternative_formula = (manifold_size / sgf.get_estimate().get_grid().shape[1]) * np.sum(sgf.get_estimate().grid_values.T * sdsd.grid_values, axis=1)
sgf.predict_nonlinear_via_transition_density(sdsd)
assert np.allclose(sgf.get_estimate().grid_values, values_alternative_formula, atol=1e-12)

# Verify that it is approximately uniform now
assert np.isclose(sum(abs(np.diff(sgf.get_estimate().grid_values.T))), 0, atol=tol_verify_equal)

if __name__ == '__main__':
unittest.main()

0 comments on commit 6c9d859

Please sign in to comment.