-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a15ecdc
commit 6fb02b1
Showing
2 changed files
with
179 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
import numpy as np | ||
import warnings | ||
from .abstract_grid_filter import AbstractGridFilter | ||
from .abstract_hyperhemispherical_filter import AbstractHyperhemisphericalFilter | ||
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution | ||
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution | ||
from pyrecest.distributions import BinghamDistribution | ||
from pyrecest.distributions import WatsonDistribution | ||
from pyrecest.distributions import VonMisesFisherDistribution | ||
from pyrecest.distributions import HypersphericalMixture | ||
from pyrecest.distributions import HyperhemisphericalUniformDistribution | ||
from pyrecest.distributions import AbstractHyperhemisphericalDistribution | ||
from pyrecest.distributions import HyperhemisphericalWatsonDistribution | ||
|
||
class HyperhemisphericalGridFilter(AbstractGridFilter, AbstractHyperhemisphericalFilter): | ||
def __init__(self, no_of_coefficients, dim, grid_type='eq_point_set_symm'): | ||
self.gd = HyperhemisphericalGridDistribution.from_distribution( | ||
HyperhemisphericalUniformDistribution(dim), no_of_coefficients, grid_type) | ||
|
||
def set_state(self, new_state): | ||
assert self.dim == new_state.dim | ||
assert isinstance(new_state, AbstractHyperhemisphericalDistribution) | ||
self.gd = new_state | ||
|
||
def predict_identity(self, d_sys): | ||
assert isinstance(d_sys, AbstractHyperhemisphericalDistribution) | ||
sd_half_cond_sd_half = HyperhemisphericalGridFilter.sys_noise_to_transition_density( | ||
d_sys, self.gd.grid_values.shape[0]) | ||
self.predict_nonlinear_via_transition_density(sd_half_cond_sd_half) | ||
|
||
def update_identity(self, meas_noise, z=None): | ||
assert isinstance(meas_noise, AbstractHyperhemisphericalDistribution) | ||
if not z==None: | ||
measNoise = measNoise.setMode(z) | ||
curr_grid = self.gd.get_grid() | ||
self.gd = self.gd.multiply(HyperhemisphericalGridDistribution(curr_grid, 2 * meas_noise.pdf(curr_grid).T)) | ||
|
||
def update_nonlinear(self, likelihood, z): | ||
self.gd.grid_values = self.gd.grid_values * likelihood(z, self.gd.get_grid()).T | ||
with warnings.catch_warnings(): | ||
warnings.simplefilter("ignore", category=RuntimeWarning) | ||
self.gd = self.gd.normalize() | ||
|
||
def predict_nonlinear_via_transition_density(self, f_trans): | ||
assert np.array_equal(self.gd.get_grid(), f_trans.get_grid()), \ | ||
"fTrans is using an incompatible grid." | ||
self.gd = self.gd.normalize() | ||
grid_values_new = self.gd.get_manifold_size() / self.gd.grid_values.shape[0] * f_trans.grid_values.dot( | ||
self.gd.grid_values) | ||
self.gd = HyperhemisphericalGridDistribution(self.gd.get_grid(), grid_values_new) | ||
|
||
def get_point_estimate(self): | ||
gd_full_sphere = self.gd.to_full_sphere() | ||
p = BinghamDistribution.fit(gd_full_sphere.get_grid(), gd_full_sphere.grid_values.T / np.sum( | ||
gd_full_sphere.grid_values)).mode() | ||
if p[-1] < 0: | ||
p = -p | ||
return p | ||
|
||
@staticmethod | ||
def sys_noise_to_transition_density(d_sys, no_grid_points): | ||
if isinstance(d_sys, AbstractDistribution): | ||
if isinstance(d_sys, (HyperhemisphericalWatsonDistribution, WatsonDistribution)): | ||
def trans(xkk, xk): | ||
return np.array([2 * WatsonDistribution(xk[:, i], d_sys.kappa).pdf(xkk) for i in range(xk.shape[1])]).T | ||
|
||
elif (isinstance(d_sys, HypersphericalMixture) and len(d_sys.dists) == 2 and | ||
np.all(d_sys.w == 0.5) and np.array_equal(d_sys.dists[0].mu, -d_sys.dists[1].mu) and | ||
d_sys.dists[0].kappa == d_sys.dists[1].kappa): | ||
def trans(xkk, xk): | ||
return np.array([(VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(xkk) + | ||
VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(-xkk)) | ||
for i in range(xk.shape[1])]).T | ||
else: | ||
raise ValueError("Distribution not supported for predict identity. Must be zonal (rotationally symmetric around last dimension)") | ||
|
||
print("PredictIdentity:Inefficient - Using inefficient prediction. Consider precalculating the SdHalfCondSdHalfGridDistribution and using predictNonlinearViaTransitionDensity.") | ||
sd_half_cond_sd_half = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, 'eq_point_set_symm', 2 * d_sys.dim) | ||
return sd_half_cond_sd_half | ||
|
||
else: | ||
raise TypeError("d_sys must be an instance of AbstractDistribution") | ||
|
96 changes: 96 additions & 0 deletions
96
pyrecest/tests/filters/test_hyperhemispherical_grid_filter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import unittest | ||
import numpy as np | ||
from pyrecest.filters.hyperhemispherical_grid_filter import HyperhemisphericalGridFilter | ||
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution | ||
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution | ||
from pyrecest.distributions import BinghamDistribution, HypersphericalMixture, VonMisesFisherDistribution | ||
|
||
class HyperhemisphericalGridFilterTest(unittest.TestCase): | ||
def test_set_state_s2(self): | ||
np.random.seed(0) | ||
no_grid_points = 1001 | ||
sgf = HyperhemisphericalGridFilter(no_grid_points, 3) | ||
|
||
self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1)) | ||
|
||
# Test if it is uniform at the beginning | ||
self.assertAlmostEqual(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size() * np.ones((no_grid_points, 1))))), 0, delta=1e-13) | ||
|
||
M = np.eye(3) | ||
Z = np.array([-2, -1, 0]).reshape(-1, 1) | ||
bd = BinghamDistribution(Z, M) | ||
bd.F = bd.F * bd.integrate_numerically() | ||
|
||
sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points) | ||
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution) | ||
sgf.set_state(sgd_state) | ||
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution) | ||
|
||
# Verify that it is no longer a uniform distribution | ||
self.assertGreater(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size()))), 60) | ||
|
||
# Verify estimate matches a mode of the Bingham | ||
self.assertAlmostEqual(np.min(np.linalg.norm(sgf.get_point_estimate() - np.hstack((bd.mode(), -bd.mode())), axis=0)), 0, delta=1e-11) | ||
|
||
# Verify errors | ||
full_grid = sgd_state.get_grid() | ||
sgd_state.grid = full_grid[:, -1] | ||
sgd_state.grid_values = sgd_state.grid_values[:-1] | ||
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution) | ||
sgf_tmp = sgf.copy() | ||
|
||
with self.assertRaises(ValueError): | ||
sgf_tmp.set_state(sgd_state) | ||
|
||
with self.assertRaises(ValueError): | ||
sgf.set_state(bd) | ||
|
||
def test_set_state_s3(self): | ||
no_grid_points = 1001 | ||
sgf = HyperhemisphericalGridFilter(no_grid_points, 4) | ||
self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1)) | ||
|
||
# Test if it is uniform at the beginning | ||
self.assertAlmostEqual(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 0) | ||
|
||
M = np.eye(4) | ||
Z = np.array([-2, -1, -0.5, 0]).reshape(-1, 1) | ||
bd = BinghamDistribution(Z, M) | ||
bd.F = bd.F * bd.integrate_numerically() | ||
|
||
sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points) | ||
sgf.set_state(sgd_state) | ||
|
||
# Verify that it is no longer a uniform distribution | ||
self.assertGreater(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 5) | ||
def test_predict_converges_to_uniform_S2_S3(self): | ||
test_predict_converges_to_uniform(3, 501, [-2, -1, 0], 3, 5e-5, 'eq_point_set_symm', 6) | ||
test_predict_converges_to_uniform(4, 1001, [-2, -1, -0.5, 0], 5, 1e-3, 'eq_point_set', 8) | ||
def test_predict_converges_to_uniform(dim, no_grid_points, z_values, tol_verify_greater, tol_verify_equal, eq_point_set_type, eq_point_set_arg): | ||
sgf = HyperhemisphericalGridFilter(no_grid_points, dim) | ||
M = np.eye(dim) | ||
Z = np.array(z_values).reshape(-1, 1) | ||
bd = BinghamDistribution(Z, M) | ||
bd.F = bd.F * bd.integrate_numerically() | ||
sgf.set_state(HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)) | ||
|
||
# Verify that it is not a uniform distribution | ||
assert sum(abs(np.diff(sgf.get_estimate().grid_values.T))) > tol_verify_greater | ||
|
||
# Predict 10 times with VM-distributed noise | ||
def trans(xkk, xk): | ||
return 2 * np.hstack([HypersphericalMixture([VonMisesFisherDistribution(xk[:, i], 1), VonMisesFisherDistribution(-xk[:, i], 1)], [0.5, 0.5]).pdf(xkk) for i in range(xk.shape[1])]) | ||
|
||
sdsd = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, eq_point_set_type, eq_point_set_arg) | ||
manifold_size = sgf.get_estimate().get_manifold_size() | ||
|
||
for i in range(10): | ||
values_alternative_formula = (manifold_size / sgf.get_estimate().get_grid().shape[1]) * np.sum(sgf.get_estimate().grid_values.T * sdsd.grid_values, axis=1) | ||
sgf.predict_nonlinear_via_transition_density(sdsd) | ||
assert np.allclose(sgf.get_estimate().grid_values, values_alternative_formula, atol=1e-12) | ||
|
||
# Verify that it is approximately uniform now | ||
assert np.isclose(sum(abs(np.diff(sgf.get_estimate().grid_values.T))), 0, atol=tol_verify_equal) | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |