# Copyright 2016 the GPflow authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import tensorflow as tf import numpy as np from numpy.testing import assert_allclose import gpflow from gpflow.test_util import GPflowTestCase from .reference import referenceRbfKernel def univariate_log_marginal_likelihood(y, K, noiseVariance): return (-0.5 * y * y / (K + noiseVariance) -0.5 * np.log(K + noiseVariance) -0.5 * np.log(np.pi * 2.)) def univariate_posterior(y, K, noiseVariance): mean = K * y / (K + noiseVariance) variance = K - K / (K + noiseVariance) return mean, variance def univariate_prior_KL(meanA, meanB, varA, varB): # KL[ qA | qB ] = E_{qA} \log [qA / qB] where qA and qB are univariate normal distributions. return (0.5 * (np.log(varB) - np.log(varA) - 1. + varA/varB + (meanB-meanA) * (meanB - meanA) / varB)) def multivariate_prior_KL(meanA, covA, meanB, covB): # KL[ qA | qB ] = E_{qA} \log [qA / qB] where qA and aB are # K dimensional multivariate normal distributions. # Analytically tractable and equal to... # 0.5 * (Tr(covB^{-1} covA) + (meanB - meanA)^T covB^{-1} (meanB - meanA) # - K + log(det(covB)) - log (det(covA))) K = covA.shape[0] traceTerm = 0.5 * np.trace(np.linalg.solve(covB, covA)) delta = meanB - meanA mahalanobisTerm = 0.5 * np.dot(delta.T, np.linalg.solve(covB, delta)) constantTerm = -0.5 * K priorLogDeterminantTerm = 0.5*np.linalg.slogdet(covB)[1] variationalLogDeterminantTerm = -0.5 * np.linalg.slogdet(covA)[1] return (traceTerm + mahalanobisTerm + constantTerm + priorLogDeterminantTerm + variationalLogDeterminantTerm) def kernel(kernelVariance=1, lengthScale=1.): kern = gpflow.kernels.RBF(1) kern.variance = kernelVariance kern.lengthscales = lengthScale return kern class VariationalUnivariateTest(GPflowTestCase): y_real = 2. K = 1. noiseVariance = 0.5 univariate = 1 oneLatentFunction = 1 meanZero = 0. X = np.atleast_2d(np.array([0.])) Y = np.atleast_2d(np.array([y_real])) Z = X.copy() posteriorMean, posteriorVariance = univariate_posterior( y=y_real, K=K, noiseVariance=noiseVariance) posteriorStd = np.sqrt(posteriorVariance) def likelihood(self): return gpflow.likelihoods.Gaussian(variance=self.noiseVariance) def get_model(self, is_diagonal, is_whitened): m = gpflow.models.SVGP( X=self.X, Y=self.Y, kern=kernel(kernelVariance=self.K), likelihood=self.likelihood(), Z=self.Z, q_diag=is_diagonal, whiten=is_whitened, autobuild=False) if is_diagonal: ones = np.ones((self.univariate, self.univariate, self.oneLatentFunction)) m.q_sqrt = ones * self.posteriorStd else: ones = np.ones((self.univariate, self.univariate, self.oneLatentFunction)) m.q_sqrt = ones * self.posteriorStd m.q_mu = np.ones((self.univariate, self.oneLatentFunction)) * self.posteriorMean m.compile() return m def test_prior_KL(self): with self.test_context(): meanA = self.posteriorMean varA = self.posteriorVariance meanB = self.meanZero # Assumes a zero varB = self.K referenceKL = univariate_prior_KL(meanA, meanB, varA, varB) for is_diagonal in [True, False]: for is_whitened in [True, False]: m = self.get_model(is_diagonal, is_whitened) test_prior_KL = gpflow.autoflow()(m.build_prior_KL.__func__)(m) assert_allclose(referenceKL - test_prior_KL, 0, atol=4) def test_build_likelihood(self): with self.test_context(): # reference marginal likelihood log_marginal_likelihood = univariate_log_marginal_likelihood( y=self.y_real, K=self.K, noiseVariance=self.noiseVariance) for is_diagonal in [True, False]: for is_whitened in [True, False]: model = self.get_model(is_diagonal, is_whitened) model_likelihood = model.compute_log_likelihood() assert_allclose(model_likelihood - log_marginal_likelihood, 0, atol=4) def testUnivariateConditionals(self): with self.test_context() as sess: for is_diagonal in [True, False]: for is_whitened in [True, False]: m = self.get_model(is_diagonal, is_whitened) with gpflow.params_as_tensors_for(m): if is_whitened: fmean_func, fvar_func = gpflow.conditionals.conditional( self.X, self.Z, m.kern, m.q_mu, q_sqrt=m.q_sqrt) else: fmean_func, fvar_func = gpflow.conditionals.conditional( self.X, self.Z, m.kern, m.q_mu, q_sqrt=m.q_sqrt, white=True) mean_value = fmean_func.eval(session=sess)[0, 0] var_value = fvar_func.eval(session=sess)[0, 0] assert_allclose(mean_value - self.posteriorMean, 0, atol=4) assert_allclose(var_value - self.posteriorVariance, 0, atol=4) class VariationalMultivariateTest(GPflowTestCase): nDimensions = 3 rng = np.random.RandomState(1) rng = rng Y = rng.randn(nDimensions, 1) X = rng.randn(nDimensions, 1) Z = X.copy() noiseVariance = 0.5 signalVariance = 1.5 lengthScale = 1.7 oneLatentFunction = 1 q_mean = rng.randn(nDimensions, oneLatentFunction) q_sqrt_diag = rng.rand(nDimensions, oneLatentFunction) q_sqrt_full = np.tril(rng.rand(nDimensions, nDimensions)) def likelihood(self): return gpflow.likelihoods.Gaussian(self.noiseVariance) def get_model(self, is_diagonal, is_whitened): m = gpflow.models.SVGP( X=self.X, Y=self.Y, kern=kernel(kernelVariance=self.signalVariance, lengthScale=self.lengthScale), likelihood=self.likelihood(), Z=self.Z, q_diag=is_diagonal, whiten=is_whitened) if is_diagonal: m.q_sqrt = self.q_sqrt_diag else: m.q_sqrt = self.q_sqrt_full[None, :, :] m.q_mu = self.q_mean return m def test_refrence_implementation_consistency(self): with self.test_context(): rng = np.random.RandomState(10) qMean = rng.randn() qCov = rng.rand() pMean = rng.rand() pCov = rng.rand() univariate_KL = univariate_prior_KL(qMean, pMean, qCov, pCov) multivariate_KL = multivariate_prior_KL( np.array([[qMean]]), np.array([[qCov]]), np.array([[pMean]]), np.array([[pCov]])) assert_allclose(univariate_KL - multivariate_KL, 0, atol=4) def test_prior_KL_fullQ(self): with self.test_context(): covQ = np.dot(self.q_sqrt_full, self.q_sqrt_full.T) mean_prior = np.zeros((self.nDimensions, 1)) for is_whitened in [True, False]: m = self.get_model(False, is_whitened) if is_whitened: cov_prior = np.eye(self.nDimensions) else: cov_prior = referenceRbfKernel( self.X, self.lengthScale, self.signalVariance) referenceKL = multivariate_prior_KL( self.q_mean, covQ, mean_prior, cov_prior) # now get test KL. test_prior_KL = gpflow.autoflow()(m.build_prior_KL.__func__)(m) assert_allclose(referenceKL - test_prior_KL, 0, atol=4) if __name__ == "__main__": tf.test.main()