# Copyright 2017 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from numpy.testing import assert_allclose
import gpflow
import tensorflow as tf
from gpflow.mean_functions import Constant
rng = np.random.RandomState(0)
class Datum:
N1, N2 = 6, 12
X = [rng.rand(N1, 2) * 1, rng.rand(N2, 2) * 1]
Y = [np.sin(x[:, :1]) + 0.9 * np.cos(x[:, 1:2] * 1.6) + rng.randn(x.shape[0], 1) * 0.8 for x in X]
label = [np.zeros((N1, 1)), np.ones((N2, 1))]
perm = list(range(30))
rng.shuffle(perm)
Xtest = rng.rand(10, 2) * 10
X_augumented = np.hstack([np.concatenate(X), np.concatenate(label)])
Y_augumented = np.hstack([np.concatenate(Y), np.concatenate(label)])
# For predict tests
X_augumented0 = np.hstack([Xtest, np.zeros((Xtest.shape[0], 1))])
X_augumented1 = np.hstack([Xtest, np.ones((Xtest.shape[0], 1))])
Ytest = [np.sin(x) + 0.9 * np.cos(x * 1.6) for x in Xtest]
Y_augumented0 = np.hstack([Ytest, np.zeros((Xtest.shape[0], 1))])
Y_augumented1 = np.hstack([Ytest, np.ones((Xtest.shape[0], 1))])
def _prepare_models():
"""
Prepare models to make sure the coregionalized model with diagonal coregion kernel and
with fixed lengthscale is equivalent with normal GP regression.
"""
# 1. Two independent VGPs for two sets of data
k0 = gpflow.kernels.SquaredExponential()
k0.lengthscale.trainable = False
k1 = gpflow.kernels.SquaredExponential()
k1.lengthscale.trainable = False
vgp0 = gpflow.models.VGP((Datum.X[0], Datum.Y[0]),
kernel=k0,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
vgp1 = gpflow.models.VGP((Datum.X[1], Datum.Y[1]),
kernel=k1,
mean_function=Constant(),
likelihood=gpflow.likelihoods.Gaussian(), num_latent=1)
# 2. Coregionalized GPR
kc = gpflow.kernels.SquaredExponential(active_dims=[0, 1])
kc.lengthscale.trainable = False
kc.variance.trainable = False # variance is handles by the coregion kernel
coreg = gpflow.kernels.Coregion(output_dim=2, rank=1, active_dims=[2])
coreg.W.trainable = False
lik = gpflow.likelihoods.SwitchedLikelihood([gpflow.likelihoods.Gaussian(),
gpflow.likelihoods.Gaussian()]
)
mean_c = gpflow.mean_functions.SwitchedMeanFunction(
[gpflow.mean_functions.Constant(), gpflow.mean_functions.Constant()])
cvgp = gpflow.models.VGP((Datum.X_augumented, Datum.Y_augumented),
kernel=kc * coreg,
mean_function=mean_c,
likelihood=lik,
num_latent=1
)
# Train them for a small number of iterations
opt = gpflow.optimizers.Scipy()
@tf.function(autograph=False)
def vgp0_closure():
return - vgp0.log_marginal_likelihood()
@tf.function(autograph=False)
def vgp1_closure():
return - vgp1.log_marginal_likelihood()
@tf.function(autograph=False)
def cvgp_closure():
return - cvgp.log_marginal_likelihood()
opt.minimize(vgp0_closure, variables=vgp0.trainable_variables,
options=dict(maxiter=1000), method='BFGS')
opt.minimize(vgp1_closure, variables=vgp1.trainable_variables,
options=dict(maxiter=1000), method='BFGS')
opt.minimize(cvgp_closure, variables=cvgp.trainable_variables,
options=dict(maxiter=1000), method='BFGS')
return vgp0, vgp1, cvgp
# ------------------------------------------
# Tests
# ------------------------------------------
def test_likelihood_variance():
vgp0, vgp1, cvgp = _prepare_models()
assert_allclose(vgp0.likelihood.variance.read_value(),
cvgp.likelihood.likelihoods[0].variance.read_value(),
atol=1e-2)
assert_allclose(vgp1.likelihood.variance.read_value(),
cvgp.likelihood.likelihoods[1].variance.read_value(),
atol=1e-2)
def test_kernel_variance():
vgp0, vgp1, cvgp = _prepare_models()
assert_allclose(vgp0.kernel.variance.read_value(),
cvgp.kernel.kernels[1].kappa.read_value()[0],
atol=1.0e-4)
assert_allclose(vgp1.kernel.variance.read_value(),
cvgp.kernel.kernels[1].kappa.read_value()[1],
atol=1.0e-4)
def test_mean_values():
vgp0, vgp1, cvgp = _prepare_models()
assert_allclose(vgp0.mean_function.c.read_value(),
cvgp.mean_function.meanfunctions[0].c.read_value(),
atol=1.0e-4)
assert_allclose(vgp1.mean_function.c.read_value(),
cvgp.mean_function.meanfunctions[1].c.read_value(),
atol=1.0e-4)
def test_predict_f():
vgp0, vgp1, cvgp = _prepare_models()
pred_f0 = vgp0.predict_f(Datum.Xtest)
pred_fc0 = cvgp.predict_f(Datum.X_augumented0)
assert_allclose(pred_f0, pred_fc0, atol=1.0e-4)
pred_f1 = vgp1.predict_f(Datum.Xtest)
pred_fc1 = cvgp.predict_f(Datum.X_augumented1)
assert_allclose(pred_f1, pred_fc1, atol=1.0e-4)
# check predict_f_full_cov
vgp0.predict_f(Datum.Xtest, full_cov=True)
cvgp.predict_f(Datum.X_augumented0, full_cov=True)
vgp1.predict_f(Datum.Xtest, full_cov=True)
cvgp.predict_f(Datum.X_augumented1, full_cov=True)
def test_predict_y():
vgp0, vgp1, cvgp = _prepare_models()
mu1, var1 = vgp0.predict_y(Datum.Xtest)
c_mu1, c_var1 = cvgp.predict_y(
np.hstack([Datum.Xtest, np.zeros((Datum.Xtest.shape[0], 1))]))
# predict_y returns results for all the likelihoods in multi_likelihood
assert_allclose(mu1, c_mu1[:, :1], atol=1.0e-4)
assert_allclose(var1, c_var1[:, :1], atol=1.0e-4)
mu2, var2 = vgp1.predict_y(Datum.Xtest)
c_mu2, c_var2 = cvgp.predict_y(
np.hstack([Datum.Xtest, np.ones((Datum.Xtest.shape[0], 1))]))
# predict_y returns results for all the likelihoods in multi_likelihood
assert_allclose(mu2, c_mu2[:, 1:2], atol=1.0e-4)
assert_allclose(var2, c_var2[:, 1:2], atol=1.0e-4)
def test_predict_log_density():
vgp0, vgp1, cvgp = _prepare_models()
pred_ydensity0 = vgp0.predict_log_density((Datum.Xtest, Datum.Ytest))
pred_ydensity_c0 = cvgp.predict_log_density((Datum.X_augumented0, Datum.Y_augumented0))
assert_allclose(pred_ydensity0, pred_ydensity_c0, atol=1e-2)
pred_ydensity1 = vgp1.predict_log_density((Datum.Xtest, Datum.Ytest))
pred_ydensity_c1 = cvgp.predict_log_density((Datum.X_augumented1, Datum.Y_augumented1))
assert_allclose(pred_ydensity1, pred_ydensity_c1, atol=1e-2)
def test_predict_f_samples():
vgp0, vgp1, cvgp = _prepare_models()
# just check predict_f_samples(self) works
cvgp.predict_f_samples(Datum.X_augumented0, 1)
cvgp.predict_f_samples(Datum.X_augumented1, 1)