# Copyright 2016 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
import numpy as np
from numpy.testing import assert_allclose
import gpflow
from gpflow.test_util import GPflowTestCase
from .reference import referenceRbfKernel
def univariate_log_marginal_likelihood(y, K, noiseVariance):
return (-0.5 * y * y / (K + noiseVariance)
-0.5 * np.log(K + noiseVariance)
-0.5 * np.log(np.pi * 2.))
def univariate_posterior(y, K, noiseVariance):
mean = K * y / (K + noiseVariance)
variance = K - K / (K + noiseVariance)
return mean, variance
def univariate_prior_KL(meanA, meanB, varA, varB):
# KL[ qA | qB ] = E_{qA} \log [qA / qB] where qA and qB are univariate normal distributions.
return (0.5 * (np.log(varB) - np.log(varA) - 1. + varA/varB +
(meanB-meanA) * (meanB - meanA) / varB))
def multivariate_prior_KL(meanA, covA, meanB, covB):
# KL[ qA | qB ] = E_{qA} \log [qA / qB] where qA and aB are
# K dimensional multivariate normal distributions.
# Analytically tractable and equal to...
# 0.5 * (Tr(covB^{-1} covA) + (meanB - meanA)^T covB^{-1} (meanB - meanA)
# - K + log(det(covB)) - log (det(covA)))
K = covA.shape[0]
traceTerm = 0.5 * np.trace(np.linalg.solve(covB, covA))
delta = meanB - meanA
mahalanobisTerm = 0.5 * np.dot(delta.T, np.linalg.solve(covB, delta))
constantTerm = -0.5 * K
priorLogDeterminantTerm = 0.5*np.linalg.slogdet(covB)[1]
variationalLogDeterminantTerm = -0.5 * np.linalg.slogdet(covA)[1]
return (traceTerm +
mahalanobisTerm +
constantTerm +
priorLogDeterminantTerm +
variationalLogDeterminantTerm)
def kernel(kernelVariance=1, lengthScale=1.):
kern = gpflow.kernels.RBF(1)
kern.variance = kernelVariance
kern.lengthscales = lengthScale
return kern
class VariationalUnivariateTest(GPflowTestCase):
y_real = 2.
K = 1.
noiseVariance = 0.5
univariate = 1
oneLatentFunction = 1
meanZero = 0.
X = np.atleast_2d(np.array([0.]))
Y = np.atleast_2d(np.array([y_real]))
Z = X.copy()
posteriorMean, posteriorVariance = univariate_posterior(
y=y_real, K=K, noiseVariance=noiseVariance)
posteriorStd = np.sqrt(posteriorVariance)
def likelihood(self):
return gpflow.likelihoods.Gaussian(variance=self.noiseVariance)
def get_model(self, is_diagonal, is_whitened):
m = gpflow.models.SVGP(
X=self.X, Y=self.Y,
kern=kernel(kernelVariance=self.K),
likelihood=self.likelihood(),
Z=self.Z,
q_diag=is_diagonal,
whiten=is_whitened,
autobuild=False)
if is_diagonal:
ones = np.ones((self.univariate, self.univariate, self.oneLatentFunction))
m.q_sqrt = ones * self.posteriorStd
else:
ones = np.ones((self.univariate, self.univariate, self.oneLatentFunction))
m.q_sqrt = ones * self.posteriorStd
m.q_mu = np.ones((self.univariate, self.oneLatentFunction)) * self.posteriorMean
m.compile()
return m
def test_prior_KL(self):
with self.test_context():
meanA = self.posteriorMean
varA = self.posteriorVariance
meanB = self.meanZero # Assumes a zero
varB = self.K
referenceKL = univariate_prior_KL(meanA, meanB, varA, varB)
for is_diagonal in [True, False]:
for is_whitened in [True, False]:
m = self.get_model(is_diagonal, is_whitened)
test_prior_KL = gpflow.autoflow()(m.build_prior_KL.__func__)(m)
assert_allclose(referenceKL - test_prior_KL, 0, atol=4)
def test_build_likelihood(self):
with self.test_context():
# reference marginal likelihood
log_marginal_likelihood = univariate_log_marginal_likelihood(
y=self.y_real, K=self.K, noiseVariance=self.noiseVariance)
for is_diagonal in [True, False]:
for is_whitened in [True, False]:
model = self.get_model(is_diagonal, is_whitened)
model_likelihood = model.compute_log_likelihood()
assert_allclose(model_likelihood - log_marginal_likelihood, 0, atol=4)
def testUnivariateConditionals(self):
with self.test_context() as sess:
for is_diagonal in [True, False]:
for is_whitened in [True, False]:
m = self.get_model(is_diagonal, is_whitened)
with gpflow.params_as_tensors_for(m):
if is_whitened:
fmean_func, fvar_func = gpflow.conditionals.conditional(
self.X, self.Z, m.kern, m.q_mu, q_sqrt=m.q_sqrt)
else:
fmean_func, fvar_func = gpflow.conditionals.conditional(
self.X, self.Z, m.kern, m.q_mu, q_sqrt=m.q_sqrt, white=True)
mean_value = fmean_func.eval(session=sess)[0, 0]
var_value = fvar_func.eval(session=sess)[0, 0]
assert_allclose(mean_value - self.posteriorMean, 0, atol=4)
assert_allclose(var_value - self.posteriorVariance, 0, atol=4)
class VariationalMultivariateTest(GPflowTestCase):
nDimensions = 3
rng = np.random.RandomState(1)
rng = rng
Y = rng.randn(nDimensions, 1)
X = rng.randn(nDimensions, 1)
Z = X.copy()
noiseVariance = 0.5
signalVariance = 1.5
lengthScale = 1.7
oneLatentFunction = 1
q_mean = rng.randn(nDimensions, oneLatentFunction)
q_sqrt_diag = rng.rand(nDimensions, oneLatentFunction)
q_sqrt_full = np.tril(rng.rand(nDimensions, nDimensions))
def likelihood(self):
return gpflow.likelihoods.Gaussian(self.noiseVariance)
def get_model(self, is_diagonal, is_whitened):
m = gpflow.models.SVGP(
X=self.X, Y=self.Y,
kern=kernel(kernelVariance=self.signalVariance, lengthScale=self.lengthScale),
likelihood=self.likelihood(),
Z=self.Z,
q_diag=is_diagonal,
whiten=is_whitened)
if is_diagonal:
m.q_sqrt = self.q_sqrt_diag
else:
m.q_sqrt = self.q_sqrt_full[None, :, :]
m.q_mu = self.q_mean
return m
def test_refrence_implementation_consistency(self):
with self.test_context():
rng = np.random.RandomState(10)
qMean = rng.randn()
qCov = rng.rand()
pMean = rng.rand()
pCov = rng.rand()
univariate_KL = univariate_prior_KL(qMean, pMean, qCov, pCov)
multivariate_KL = multivariate_prior_KL(
np.array([[qMean]]), np.array([[qCov]]),
np.array([[pMean]]), np.array([[pCov]]))
assert_allclose(univariate_KL - multivariate_KL, 0, atol=4)
def test_prior_KL_fullQ(self):
with self.test_context():
covQ = np.dot(self.q_sqrt_full, self.q_sqrt_full.T)
mean_prior = np.zeros((self.nDimensions, 1))
for is_whitened in [True, False]:
m = self.get_model(False, is_whitened)
if is_whitened:
cov_prior = np.eye(self.nDimensions)
else:
cov_prior = referenceRbfKernel(
self.X, self.lengthScale, self.signalVariance)
referenceKL = multivariate_prior_KL(
self.q_mean, covQ, mean_prior, cov_prior)
# now get test KL.
test_prior_KL = gpflow.autoflow()(m.build_prior_KL.__func__)(m)
assert_allclose(referenceKL - test_prior_KL, 0, atol=4)
if __name__ == "__main__":
tf.test.main()