https://github.com/GPflow/GPflow
Revision d55e4525393a2bb3e25c00a3bb1b5b258ffc0549 authored by Artem Artemev on 05 January 2019, 23:35:21 UTC, committed by Artem Artemev on 05 January 2019, 23:35:21 UTC
1 parent f01a776
Tip revision: d55e4525393a2bb3e25c00a3bb1b5b258ffc0549 authored by Artem Artemev on 05 January 2019, 23:35:21 UTC
Add kernel tests
Add kernel tests
Tip revision: d55e452
test_kernels.py
# Copyright 2017 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import numpy as np
import tensorflow as tf
from numpy.testing import assert_allclose
import gpflow
from gpflow.kernels import (RBF, ArcCosine, Constant, Coregion, Linear,
Matern32, Matern52, Periodic, Polynomial,
RationalQuadratic, Stationary)
from gpflow.test_util import GPflowTestCase
def ref_rbf(X, lengthScale, signalVariance):
nDataPoints, _ = X.shape
kernel = np.zeros((nDataPoints, nDataPoints))
for row_index in range(nDataPoints):
for column_index in range(nDataPoints):
vecA = X[row_index, :]
vecB = X[column_index, :]
delta = vecA - vecB
distanceSquared = np.dot(delta.T, delta)
kernel[row_index, column_index] = signalVariance * np.exp(-0.5 * distanceSquared / lengthScale**2)
return kernel
def ref_arccosine(X, order, weightVariances, biasVariance, signalVariance):
num_points = X.shape[0]
kernel = np.empty((num_points, num_points))
for row in range(num_points):
for col in range(num_points):
x = X[row]
y = X[col]
numerator = (weightVariances * x).dot(y) + biasVariance
x_denominator = np.sqrt((weightVariances * x).dot(x) + biasVariance)
y_denominator = np.sqrt((weightVariances * y).dot(y) + biasVariance)
denominator = x_denominator * y_denominator
theta = np.arccos(np.clip(numerator / denominator, -1., 1.))
if order == 0:
J = np.pi - theta
elif order == 1:
J = np.sin(theta) + (np.pi - theta) * np.cos(theta)
elif order == 2:
J = 3. * np.sin(theta) * np.cos(theta)
J += (np.pi - theta) * (1. + 2. * np.cos(theta) ** 2)
kernel[row, col] = signalVariance * (1. / np.pi) * J * \
x_denominator ** order * \
y_denominator ** order
return kernel
def ref_periodic(X, lengthScale, signalVariance, period):
# Based on the GPy implementation of standard_period kernel
base = np.pi * (X[:, None, :] - X[None, :, :]) / period
exp_dist = np.exp(-0.5 * np.sum(np.square(np.sin(base) / lengthScale), axis=-1))
return signalVariance * exp_dist
def test_rbf_1d(self):
lengthscale = 1.4
variance = 2.3
kernel = gpflow.kernels.RBF(1, lengthscales=lengthscale, variance=variance)
rng = np.random.RandomState(1)
X = rng.randn(3, 1)
gram_matrix = kernel(X)
reference_gram_matrix = ref_rbf(X, lengthscale, variance)
assert_allclose(gram_matrix, reference_gram_matrix)
def test_rq_1d(self):
lengthscale = 1.4
variance = 2.3
kSE = gpflow.kernels.RBF(1, lengthscales=lengthscale, variance=variance)
kRQ = gpflow.kernels.RationalQuadratic(1, lengthscales=lengthscale, variance=variance, alpha=1e8)
rng = np.random.RandomState(1)
X = rng.randn(6, 1).astype(gpflow.default_float())
gram_matrix_SE = kSE(X)
gram_matrix_RQ = kRQ(X)
assert_allclose(gram_matrix_SE, gram_matrix_RQ)
class TestArcCosine(GPflowTestCase):
def assert_kern_err(self, D, variance, weight_variances, bias_variance, order, ARD, X):
kernel = gpflow.kernels.ArcCosine(
D,
order=order,
variance=variance,
weight_variances=weight_variances,
bias_variance=bias_variance,
ARD=ARD)
if weight_variances is None:
weight_variances = 1.
gram_matrix = kernel(X)
reference_gram_matrix = ref_arccosine(X, order, weight_variances, bias_variance, variance)
assert_allclose(gram_matrix, reference_gram_matrix)
def test_1d(self):
with self.test_context():
D = 1
N = 3
weight_variances = 1.7
bias_variance = 0.6
variance = 2.3
ARD = False
orders = gpflow.kernels.ArcCosine.implemented_orders
rng = np.random.RandomState(1)
X_data = rng.randn(N, D)
for order in orders:
self.evalKernelError(D, variance, weight_variances,
bias_variance, order, ARD, X_data)
def test_3d(self):
with self.test_context():
D = 3
N = 8
weight_variances = np.array([0.4, 4.2, 2.3])
bias_variance = 1.9
variance = 1e-2
ARD = True
orders = gpflow.kernels.ArcCosine.implemented_orders
rng = np.random.RandomState(1)
X_data = rng.randn(N, D)
for order in orders:
self.evalKernelError(D, variance, weight_variances,
bias_variance, order, ARD, X_data)
def test_non_implemented_order(self):
with self.test_context():
with self.assertRaises(ValueError):
gpflow.kernels.ArcCosine(1, order=42)
def test_weight_initializations(self):
with self.test_context():
D = 1
N = 3
weight_variances = 1.
bias_variance = 1.
variance = 1.
ARDs = {False, True}
order = 0
rng = np.random.RandomState(1)
X_data = rng.randn(N, D)
for ARD in ARDs:
self.evalKernelError(
D, variance, weight_variances,
bias_variance, order, ARD, X_data)
def test_nan_in_gradient(self):
with self.test_context() as session:
D = 1
N = 4
rng = np.random.RandomState(23)
X_data = rng.rand(N, D)
kernel = gpflow.kernels.ArcCosine(D)
X = tf.placeholder(tf.float64)
kernel.compile()
grads = tf.gradients(kernel(X), X)
gradients = session.run(grads, feed_dict={X: X_data})
self.assertFalse(np.any(np.isnan(gradients)))
class TestPeriodic(GPflowTestCase):
def setUp(self):
self.test_graph = tf.Graph()
def evalKernelError(self, D, lengthscale, variance, period, X_data):
with self.test_context() as session:
kernel = gpflow.kernels.Periodic(
D, period=period, variance=variance, lengthscales=lengthscale)
X = tf.placeholder(gpflow.default_float())
reference_gram_matrix = ref_periodic(
X_data, lengthscale, variance, period)
kernel.compile()
gram_matrix = session.run(kernel(X), feed_dict={X: X_data})
assert_allclose(gram_matrix, reference_gram_matrix)
def test_1d(self):
with self.test_context():
D = 1
lengthScale = 2.0
variance = 2.3
period = 2.
rng = np.random.RandomState(1)
X_data = rng.randn(3, 1)
self.evalKernelError(D, lengthScale, variance, period, X_data)
def test_2d(self):
with self.test_context():
D = 2
N = 5
lengthScale = 11.5
variance = 1.3
period = 20.
rng = np.random.RandomState(1)
X_data = rng.multivariate_normal(np.zeros(D), np.eye(D), N)
self.evalKernelError(D, lengthScale, variance, period, X_data)
class TestCoregion(GPflowTestCase):
def setUp(self):
self.test_graph = tf.Graph()
with self.test_context():
self.rng = np.random.RandomState(0)
self.k = gpflow.kernels.Coregion(1, output_dim=3, rank=2)
self.k.W = self.rng.randn(3, 2)
self.k.kappa = self.rng.rand(3) + 1.
self.X = np.random.randint(0, 3, (10, 1))
self.X2 = np.random.randint(0, 3, (12, 1))
def tearDown(self):
GPflowTestCase.tearDown(self)
self.k.clear()
def test_shape(self):
with self.test_context():
self.k.compile()
K = self.k.compute_K(self.X, self.X2)
self.assertTrue(K.shape == (10, 12))
K = self.k.compute_K_symm(self.X)
self.assertTrue(K.shape == (10, 10))
def test_diag(self):
with self.test_context():
self.k.compile()
K = self.k.compute_K_symm(self.X)
Kdiag = self.k.compute_Kdiag(self.X)
self.assertTrue(np.allclose(np.diag(K), Kdiag))
def test_slice(self):
with self.test_context():
# compute another kernel with additinoal inputs,
# make sure out kernel is still okay.
X = np.hstack((self.X, self.rng.randn(10, 1)))
k1 = gpflow.kernels.Coregion(1, 3, 2, active_dims=[0])
k2 = gpflow.kernels.RBF(1, active_dims=[1])
k = k1 * k2
k.compile()
K1 = k.compute_K_symm(X)
K2 = k1.compute_K_symm(X) * k2.compute_K_symm(X) # slicing happens inside kernel
self.assertTrue(np.allclose(K1, K2))
class TestKernSymmetry(GPflowTestCase):
def setUp(self):
self.test_graph = tf.Graph()
self.kernels = [gpflow.kernels.Constant,
gpflow.kernels.Linear,
gpflow.kernels.Polynomial,
gpflow.kernels.ArcCosine]
self.kernels += gpflow.kernels.Stationary.__subclasses__()
self.rng = np.random.RandomState()
def test_1d(self):
with self.test_context() as session:
kernels = [K(1) for K in self.kernels]
for kernel in kernels:
kernel.compile()
X = tf.placeholder(tf.float64)
X_data = self.rng.randn(10, 1)
for k in kernels:
errors = session.run(k(X) - k(X, X), feed_dict={X: X_data})
self.assertTrue(np.allclose(errors, 0))
def test_5d(self):
with self.test_context() as session:
kernels = [K(5) for K in self.kernels]
for kernel in kernels:
kernel.compile()
X = tf.placeholder(tf.float64)
X_data = self.rng.randn(10, 5)
for k in kernels:
errors = session.run(k(X) - k(X, X), feed_dict={X: X_data})
self.assertTrue(np.allclose(errors, 0))
def kernels():
rng = np.random.RandomState(1)
dim = 3
ks = [k() for k in Stationary.__subclasses__() + [Constant, Linear, Polynomial, Periodic]]
orders = ArcCosine.implemented_orders
ks += [ArcCosine(order=order) for order in orders]
ks += [RBF() + Linear(),
RBF() * Linear(),
RBF() + Linear(ard=True, variance=rng.rand(dim))]
return ks
def test_diags(kernel):
X = tf.placeholder(tf.float64, [30, self.dim])
rng = np.random.RandomState(1)
X_data = np.random.randn(30, self.dim)
k1 = k(X)
k2 = tf.diag_part(k(X))
k1, k2 = session.run([k1, k2], feed_dict={X: X_data})
self.assertTrue(np.allclose(k1, k2))
class TestAdd(GPflowTestCase):
"""
add a rbf and linear kernel, make sure the result is the same as adding
the result of the kernels separaetely
"""
def setUp(self):
self.test_graph = tf.Graph()
with self.test_context():
rbf = gpflow.kernels.RBF(1)
lin = gpflow.kernels.Linear(1)
k = (gpflow.kernels.RBF(1, name='RBFInAdd') +
gpflow.kernels.Linear(1, name='LinearInAdd'))
self.rng = np.random.RandomState(0)
self.kernels = [rbf, lin, k]
def test_sym(self):
with self.test_context() as session:
X = tf.placeholder(tf.float64)
X_data = self.rng.randn(10, 1)
res = []
for k in self.kernels:
k.compile()
res.append(session.run(k(X), feed_dict={X: X_data}))
self.assertTrue(np.allclose(res[0] + res[1], res[2]))
def test_asym(self):
with self.test_context() as session:
X = tf.placeholder(tf.float64)
Z = tf.placeholder(tf.float64)
X_data = self.rng.randn(10, 1)
Z_data = self.rng.randn(12, 1)
res = []
for k in self.kernels:
k.compile()
res.append(session.run(k(X, Z), feed_dict={X: X_data, Z: Z_data}))
self.assertTrue(np.allclose(res[0] + res[1], res[2]))
class TestWhite(GPflowTestCase):
"""
The white kernel should not give the same result when called with k(X) and
k(X, X)
"""
def test(self):
with self.test_context() as session:
rng = np.random.RandomState(0)
X = tf.placeholder(tf.float64)
X_data = rng.randn(10, 1)
k = gpflow.kernels.White(1)
k.compile()
K_sym = session.run(k(X), feed_dict={X: X_data})
K_asym = session.run(k(X, X), feed_dict={X: X_data})
self.assertFalse(np.allclose(K_sym, K_asym))
class TestSlice(GPflowTestCase):
"""
Make sure the results of a sliced kernel is the same as an unsliced kernel
with correctly sliced data...
"""
def kernels(self):
ks = [gpflow.kernels.Constant,
gpflow.kernels.Linear,
gpflow.kernels.Polynomial]
ks += gpflow.kernels.Stationary.__subclasses__()
kernels = []
kernname = lambda cls, index: '_'.join([cls.__name__, str(index)])
for kernclass in ks:
kern = copy.deepcopy(kernclass)
k1 = lambda: kern(1, active_dims=[0], name=kernname(kern, 1))
k2 = lambda: kern(1, active_dims=[1], name=kernname(kern, 2))
k3 = lambda: kern(1, active_dims=slice(0, 1), name=kernname(kern, 3))
kernels.append([k1, k2, k3])
return kernels
def test_symm(self):
for k1, k2, k3 in self.kernels():
with self.test_context(graph=tf.Graph()):
rng = np.random.RandomState(0)
X = rng.randn(20, 2)
k1i, k2i, k3i = k1(), k2(), k3()
K1 = k1i.compute_K_symm(X)
K2 = k2i.compute_K_symm(X)
K3 = k3i.compute_K_symm(X[:, :1])
K4 = k3i.compute_K_symm(X[:, 1:])
self.assertTrue(np.allclose(K1, K3))
self.assertTrue(np.allclose(K2, K4))
def test_asymm(self):
for k1, k2, k3 in self.kernels():
with self.test_context(graph=tf.Graph()):
rng = np.random.RandomState(0)
X = rng.randn(20, 2)
Z = rng.randn(10, 2)
k1i, k2i, k3i = k1(), k2(), k3()
K1 = k1i.compute_K(X, Z)
K2 = k2i.compute_K(X, Z)
K3 = k3i.compute_K(X[:, :1], Z[:, :1])
K4 = k3i.compute_K(X[:, 1:], Z[:, 1:])
self.assertTrue(np.allclose(K1, K3))
self.assertTrue(np.allclose(K2, K4))
class TestProd(GPflowTestCase):
def setUp(self):
self.test_graph = tf.Graph()
with self.test_context():
k1 = gpflow.kernels.Matern32(2)
k2 = gpflow.kernels.Matern52(2, lengthscales=0.3)
k3 = k1 * k2
self.kernels = [k1, k2, k3]
def tearDown(self):
GPflowTestCase.tearDown(self)
self.kernels[2].clear()
def test_prod(self):
with self.test_context() as session:
self.kernels[2].compile()
X = tf.placeholder(tf.float64, [30, 2])
X_data = np.random.randn(30, 2)
res = []
for kernel in self.kernels:
K = kernel(X)
res.append(session.run(K, feed_dict={X: X_data}))
self.assertTrue(np.allclose(res[0] * res[1], res[2]))
class TestARDActiveProd(GPflowTestCase):
def setUp(self):
self.test_graph = tf.Graph()
self.rng = np.random.RandomState(0)
with self.test_context():
# k3 = k1 * k2
self.k1 = gpflow.kernels.RBF(3, active_dims=[0, 1, 3], ARD=True)
self.k2 = gpflow.kernels.RBF(1, active_dims=[2], ARD=True)
self.k3 = gpflow.kernels.RBF(4, ARD=True)
self.k1.lengthscales = np.array([3.4, 4.5, 5.6])
self.k2.lengthscales = np.array([6.7])
self.k3.lengthscales = np.array([3.4, 4.5, 6.7, 5.6])
self.k3a = self.k1 * self.k2
def test(self):
with self.test_context() as session:
X = tf.placeholder(tf.float64, [50, 4])
X_data = np.random.randn(50, 4)
self.k3.compile()
self.k3a.compile()
K1 = self.k3(X)
K2 = self.k3a(X)
K1 = session.run(K1, feed_dict={X: X_data})
K2 = session.run(K2, feed_dict={X: X_data})
self.assertTrue(np.allclose(K1, K2))
class TestARDInit(GPflowTestCase):
"""
For ARD kernels, make sure that kernels can be instantiated with a single
lengthscale or a suitable array of lengthscales
"""
def setUp(self):
self.test_graph = tf.Graph()
def test_scalar(self):
with self.test_context():
k1 = gpflow.kernels.RBF(3, lengthscales=2.3)
k2 = gpflow.kernels.RBF(3, lengthscales=np.ones(3) * 2.3, ARD=True)
k1_lengthscales = k1.lengthscales.read_value()
k2_lengthscales = k2.lengthscales.read_value()
self.assertTrue(np.all(k1_lengthscales == k2_lengthscales))
def test_init(self):
for ARD in (False, True, None):
with self.assertRaises(ValueError):
k1 = gpflow.kernels.RBF(1, lengthscales=[1., 1.], ARD=ARD)
with self.assertRaises(ValueError):
k2 = gpflow.kernels.RBF(2, lengthscales=[1., 1., 1.], ARD=ARD)
def test_MLP(self):
with self.test_context():
k1 = gpflow.kernels.ArcCosine(3, weight_variances=1.23, ARD=True)
k2 = gpflow.kernels.ArcCosine(3, weight_variances=np.ones(3) * 1.23, ARD=True)
k1_variances = k1.weight_variances.read_value()
k2_variances = k2.weight_variances.read_value()
self.assertTrue(np.all(k1_variances == k2_variances))
if __name__ == "__main__":
tf.test.main()
Computing file changes ...