https://github.com/GPflow/GPflow
Revision 5b16a12e3a21e1435badfe99c4821444260df2fa authored by John Bradshaw on 23 October 2017, 10:32:06 UTC, committed by John Bradshaw on 23 October 2017, 10:32:06 UTC
the future.

* EP while loop broken out and made into seperate class to allow it to
be easy to reuse elsewhere. Tidy up of the variable names and made more
general. Tests written to check that it can recover the true solution
when the approximating factors have the same solution as the true
solution.
* Central moment functions moved out of EP class and live elsewhere.
Step function, Gaussian and Gaussian CDF moments implemented all with
tests.
* Likelihoods now are associated with a method that returns their central
moments generator function. Only implemented for Bernoulli but this
allows one to store moment logic with the likelihood function which is
more where it belongs.
* Updated the VGP versus EP binary classification example script to use
this new source structure. This has remained in the docs so have not
written tests for this part.
1 parent f46eeb3
Raw File
Tip revision: 5b16a12e3a21e1435badfe99c4821444260df2fa authored by John Bradshaw on 23 October 2017, 10:32:06 UTC
Refactor of EP implementation to modularise and make more extensible in
Tip revision: 5b16a12
test_coregion.py
from __future__ import print_function
import gpflow
import numpy as np
import unittest
import tensorflow as tf

from gpflow.test_util import GPflowTestCase
from nose.plugins.attrib import attr


@attr(speed='slow')
class TestEquivalence(GPflowTestCase):
    """
    Here we make sure the coregionalized model with diagonal coregion kernel and
    with fixed lengthscale is equivalent with normal GP regression.
    """
    def setUp(self):
        with self.test_context():
            rng = np.random.RandomState(0)
            X = [rng.rand(10, 2)*10, rng.rand(20, 2)*10]
            Y = [np.sin(x) + 0.9 * np.cos(x*1.6) + rng.randn(*x.shape) * 0.8 for x in X]
            label = [np.zeros((10, 1)), np.ones((20, 1))]
            perm = list(range(30))
            rng.shuffle(perm)
            self.Xtest = rng.rand(10, 2)*10

            X_augumented = np.hstack([np.concatenate(X), np.concatenate(label)])
            Y_augumented = np.hstack([np.concatenate(Y), np.concatenate(label)])

            # two independent vgps for two sets of data
            k0 = gpflow.kernels.RBF(2)
            k0.lengthscales.fixed = True
            self.vgp0 = gpflow.models.VGP(X[0], Y[0], kern=k0,
                                       mean_function=gpflow.mean_functions.Constant(),
                                       likelihood=gpflow.likelihoods.Gaussian())
            k1 = gpflow.kernels.RBF(2)
            k1.lengthscales.fixed = True
            self.vgp1 = gpflow.models.VGP(X[1], Y[1], kern=k1,
                                       mean_function=gpflow.mean_functions.Constant(),
                                       likelihood=gpflow.likelihoods.Gaussian())
            # coregionalized gpr
            lik = gpflow.likelihoods.SwitchedLikelihood(
                [gpflow.likelihoods.Gaussian(), gpflow.likelihoods.Gaussian()])

            kc = gpflow.kernels.RBF(2)
            kc.fixed = True  # lengthscale and variance is fixed.
            coreg = gpflow.kernels.Coregion(1, output_dim=2, rank=1, active_dims=[2])
            coreg.W.fixed = True

            mean_c = gpflow.mean_functions.SwitchedMeanFunction(
                [gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
            self.cvgp = gpflow.models.VGP(X_augumented, Y_augumented,
                                       kern=kc*coreg,
                                       mean_function=mean_c,
                                       likelihood=lik,
                                       num_latent=2)

            self.vgp0.optimize(disp=False, maxiter=300)
            self.vgp1.optimize(disp=False, maxiter=300)
            self.cvgp.optimize(disp=False, maxiter=300)

    def test_all(self):
        with self.test_context():
            # check variance
            self.assertTrue(np.allclose(
                self.vgp0.likelihood.variance.value,
                self.cvgp.likelihood.likelihood_list[0].variance.value,
                atol=1e-2))
            self.assertTrue(np.allclose(
                self.vgp1.likelihood.variance.value,
                self.cvgp.likelihood.likelihood_list[1].variance.value,
                atol=1e-2))

            # check kernel variance
            self.assertTrue(np.allclose(
                self.vgp0.kern.variance.value,
                self.cvgp.kern.coregion.kappa.value[0],
                atol=1.0e-2))
            self.assertTrue(np.allclose(
                self.vgp1.kern.variance.value,
                self.cvgp.kern.coregion.kappa.value[1],
                atol=1.0e-2))

            # check mean values
            self.assertTrue(np.allclose(
                self.vgp0.mean_function.c.value,
                self.cvgp.mean_function.meanfunction_list[0].c.value,
                atol=1.0e-2))
            self.assertTrue(np.allclose(
                self.vgp1.mean_function.c.value,
                self.cvgp.mean_function.meanfunction_list[1].c.value,
                atol=1.0e-2))

            X_augumented0 = np.hstack([self.Xtest, np.zeros((self.Xtest.shape[0], 1))])
            X_augumented1 = np.hstack([self.Xtest, np.ones((self.Xtest.shape[0], 1))])
            Ytest = [np.sin(x) + 0.9 * np.cos(x*1.6) for x in self.Xtest]
            Y_augumented0 = np.hstack([Ytest, np.zeros((self.Xtest.shape[0], 1))])
            Y_augumented1 = np.hstack([Ytest, np.ones((self.Xtest.shape[0], 1))])

            # check predict_f
            pred_f0 = self.vgp0.predict_f(self.Xtest)
            pred_fc0 = self.cvgp.predict_f(X_augumented0)
            self.assertTrue(np.allclose(pred_f0, pred_fc0, atol=1.0e-2))
            pred_f1 = self.vgp1.predict_f(self.Xtest)
            pred_fc1 = self.cvgp.predict_f(X_augumented1)
            self.assertTrue(np.allclose(pred_f1, pred_fc1, atol=1.0e-2))

            # check predict y
            pred_y0 = self.vgp0.predict_y(self.Xtest)
            pred_yc0 = self.cvgp.predict_y(np.hstack([self.Xtest, np.zeros((self.Xtest.shape[0], 1))]))

            # predict_y returns results for all the likelihodds in multi_likelihood
            self.assertTrue(np.allclose(pred_y0[0], pred_yc0[0][:, :np.array(Ytest).shape[1]], atol=1.0e-2))
            self.assertTrue(np.allclose(pred_y0[1], pred_yc0[1][:, :np.array(Ytest).shape[1]], atol=1.0e-2))
            pred_y1 = self.vgp1.predict_y(self.Xtest)
            pred_yc1 = self.cvgp.predict_y(np.hstack([self.Xtest, np.ones((self.Xtest.shape[0], 1))]))
            # predict_y returns results for all the likelihodds in multi_likelihood
            self.assertTrue(np.allclose(pred_y1[0], pred_yc1[0][:, np.array(Ytest).shape[1]:], atol=1.0e-2))
            self.assertTrue(np.allclose(pred_y1[1], pred_yc1[1][:, np.array(Ytest).shape[1]:], atol=1.0e-2))

            # check predict_density
            pred_ydensity0 = self.vgp0.predict_density(self.Xtest, Ytest)
            pred_ydensity_c0 = self.cvgp.predict_density(X_augumented0, Y_augumented0)
            self.assertTrue(np.allclose(pred_ydensity0, pred_ydensity_c0, atol=1e-2))
            pred_ydensity1 = self.vgp1.predict_density(self.Xtest, Ytest)
            pred_ydensity_c1 = self.cvgp.predict_density(X_augumented1, Y_augumented1)
            self.assertTrue(np.allclose(pred_ydensity1, pred_ydensity_c1, atol=1e-2))

            # just check predict_f_samples(self) works
            self.cvgp.predict_f_samples(X_augumented0, 1)
            self.cvgp.predict_f_samples(X_augumented1, 1)

            # check predict_f_full_cov
            self.vgp0.predict_f_full_cov(self.Xtest)
            self.cvgp.predict_f_full_cov(X_augumented0)
            self.vgp1.predict_f_full_cov(self.Xtest)
            self.cvgp.predict_f_full_cov(X_augumented1)


if __name__ == '__main__':
    unittest.main()
back to top