# Copyright 2017 the GPflow authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.from __future__ import print_function import itertools import tensorflow as tf import numpy as np from numpy.testing import assert_allclose import gpflow from gpflow.test_util import GPflowTestCase from gpflow import settings class TestMeanFuncs(GPflowTestCase): """ Test the output shape for basic and compositional mean functions, also check that the combination of mean functions returns the correct clas """ input_dim = 3 output_dim = 2 N = 20 def mfs1(self): rng = np.random.RandomState(0) return [gpflow.mean_functions.Zero(), gpflow.mean_functions.Linear( rng.randn(self.input_dim, self.output_dim).astype(settings.float_type), rng.randn(self.output_dim).astype(settings.float_type)), gpflow.mean_functions.Constant( rng.randn(self.output_dim).astype(settings.float_type))] def mfs2(self): rng = np.random.RandomState(0) return [gpflow.mean_functions.Zero(), gpflow.mean_functions.Linear( rng.randn(self.input_dim, self.output_dim).astype(settings.float_type), rng.randn(self.output_dim).astype(settings.float_type)), gpflow.mean_functions.Constant( rng.randn(self.output_dim).astype(settings.float_type))] def composition_mfs_add(self): composition_mfs_add = [] for (mean_f1, mean_f2) in itertools.product(self.mfs1(), self.mfs2()): composition_mfs_add.extend([mean_f1 + mean_f2]) return composition_mfs_add def composition_mfs_mult(self): composition_mfs_mult = [] for (mean_f1, mean_f2) in itertools.product(self.mfs1(), self.mfs2()): composition_mfs_mult.extend([mean_f1 * mean_f2]) return composition_mfs_mult def composition_mfs(self): return self.composition_mfs_add() + self.composition_mfs_mult() def test_basic_output_shape(self): with self.test_context() as sess: X = tf.placeholder(settings.float_type, shape=[self.N, self.input_dim]) X_data = np.random.randn(self.N, self.input_dim).astype(settings.float_type) for mf in self.mfs1(): mf.compile() Y = sess.run(mf(X), feed_dict={X: X_data}) self.assertTrue(Y.shape in [(self.N, self.output_dim), (self.N, 1)]) def test_add_output_shape(self): with self.test_context() as sess: X = tf.placeholder(settings.float_type, [self.N, self.input_dim]) X_data = np.random.randn(self.N, self.input_dim).astype(settings.float_type) for comp_mf in self.composition_mfs_add(): comp_mf.compile() Y = sess.run(comp_mf(X), feed_dict={X: X_data}) self.assertTrue(Y.shape in [(self.N, self.output_dim), (self.N, 1)]) def test_mult_output_shape(self): with self.test_context() as sess: X = tf.placeholder(settings.float_type, [self.N, self.input_dim]) X_data = np.random.randn(self.N, self.input_dim).astype(settings.float_type) for comp_mf in self.composition_mfs_mult(): comp_mf.compile() Y = sess.run(comp_mf(X), feed_dict={X: X_data}) self.assertTrue(Y.shape in [(self.N, self.output_dim), (self.N, 1)]) def test_composition_output_shape(self): with self.test_context() as sess: X = tf.placeholder(settings.float_type, [self.N, self.input_dim]) X_data = np.random.randn(self.N, self.input_dim).astype(settings.float_type) comp_mf = self.composition_mfs()[1] comp_mf.compile() # for comp_mf in self.composition_mfs: Y = sess.run(comp_mf(X), feed_dict={X: X_data}) self.assertTrue(Y.shape in [(self.N, self.output_dim), (self.N, 1)]) def test_combination_types(self): with self.test_context(): self.assertTrue(all(isinstance(mf, gpflow.mean_functions.Additive) for mf in self.composition_mfs_add())) self.assertTrue(all(isinstance(mf, gpflow.mean_functions.Product) for mf in self.composition_mfs_mult())) class TestModelCompositionOperations(GPflowTestCase): """ Tests that operator precedence is correct and zero unary operations, i.e. adding 0, multiplying by 1, adding x and then subtracting etc. do not change the mean function """ input_dim = 3 output_dim = 2 N = 20 rng = np.random.RandomState(0) Xtest = rng.randn(30, 3).astype(settings.float_type) def initials(self): # Need two copies of the linear1_1 (l1, l2) since we can't add the # same parameter twice to a single tree. self.rng.seed(seed=0) linear1_1 = gpflow.mean_functions.Linear( self.rng.randn(self.input_dim, self.output_dim).astype(settings.float_type), self.rng.randn(self.output_dim).astype(settings.float_type)) self.rng.seed(seed=0) linear1_2 = gpflow.mean_functions.Linear( self.rng.randn(self.input_dim, self.output_dim).astype(settings.float_type), self.rng.randn(self.output_dim).astype(settings.float_type)) self.rng.seed(seed=1) linear2 = gpflow.mean_functions.Linear( self.rng.randn(self.input_dim, self.output_dim).astype(settings.float_type), self.rng.randn(self.output_dim).astype(settings.float_type)) linear3 = gpflow.mean_functions.Linear( self.rng.randn(self.input_dim, self.output_dim).astype(settings.float_type), self.rng.randn(self.output_dim).astype(settings.float_type)) linears = (linear1_1, linear1_2, linear2, linear3) # Need two copies of the const1 since we can't add the same parameter # twice to a single tree self.rng.seed(seed=2) const1_1 = gpflow.mean_functions.Constant( self.rng.randn(self.output_dim).astype(settings.float_type)) self.rng.seed(seed=2) const1_2 = gpflow.mean_functions.Constant( self.rng.randn(self.output_dim).astype(settings.float_type)) self.rng.seed(seed=3) const2 = gpflow.mean_functions.Constant( self.rng.randn(self.output_dim).astype(settings.float_type)) const3 = gpflow.mean_functions.Constant( self.rng.randn(self.output_dim).astype(settings.float_type)) consts = (const1_1, const1_2, const2, const3) const1inv = gpflow.mean_functions.Constant(const1_1.c.read_value() * -1) linear1inv = gpflow.mean_functions.Linear( A=(linear1_1.A.read_value() * -1.), b=(linear1_2.b.read_value() * -1.)) invs = (linear1inv, const1inv) return linears, consts, invs def a_b_plus_c(self): # a * (b + c) linears, consts, _ = self.initials() linear1_1, _linear1_2, linear2, linear3 = linears const1_1, _const1_2, const2, const3 = consts const_set1 = gpflow.mean_functions.Product( const1_1, gpflow.mean_functions.Additive(const2, const3)) linear_set1 = gpflow.mean_functions.Product( linear1_1, gpflow.mean_functions.Additive(linear2, linear3)) return const_set1, linear_set1 def ab_plus_ac(self): linears, consts, _ = self.initials() linear1_1, linear1_2, linear2, linear3 = linears const1_1, const1_2, const2, const3 = consts # ab + ac const_set2 = gpflow.mean_functions.Additive( gpflow.mean_functions.Product(const1_1, const2), gpflow.mean_functions.Product(const1_2, const3)) linear_set2 = gpflow.mean_functions.Additive( gpflow.mean_functions.Product(linear1_1, linear2), gpflow.mean_functions.Product(linear1_2, linear3)) return const_set2, linear_set2 # a-a = 0, def a_minus_a(self): linears, consts, invs = self.initials() linear1_1, _linear1_2, _linear2, _linear3 = linears const1_1, _const1_2, _const2, _const3 = consts linear1inv, const1inv = invs linear1_minus_linear1 = gpflow.mean_functions.Additive(linear1_1, linear1inv) const1_minus_const1 = gpflow.mean_functions.Additive(const1_1, const1inv) return linear1_minus_linear1, const1_minus_const1 def comp_minus_constituent1(self): # (a + b) - a = b = a + (b - a) linears, _consts, invs = self.initials() linear1_1, _linear1_2, linear2, _linear3 = linears linear1inv, _const1inv = invs comp_minus_constituent1 = gpflow.mean_functions.Additive( gpflow.mean_functions.Additive(linear1_1, linear2), linear1inv) return comp_minus_constituent1 def comp_minus_constituent2(self): linears, _consts, invs = self.initials() linear1_1, _linear1_2, linear2, _linear3 = linears linear1inv, _const1inv = invs comp_minus_constituent2 = gpflow.mean_functions.Additive( linear1_1, gpflow.mean_functions.Additive(linear2, linear1inv)) return comp_minus_constituent2 def setUp(self): self.test_graph = tf.Graph() with self.test_context(): zero = gpflow.mean_functions.Zero() k = gpflow.kernels.Bias(self.input_dim) const_set1, linear_set1 = self.a_b_plus_c() const_set2, linear_set2 = self.ab_plus_ac() linear1_minus_linear1, const1_minus_const1 = self.a_minus_a() comp_minus_constituent1 = self.comp_minus_constituent1() comp_minus_constituent2 = self.comp_minus_constituent2() _linear1_1, _linear1_2, linear2, _linear3 = self.initials()[0] X = self.rng.randn(self.N, self.input_dim).astype(settings.float_type) Y = self.rng.randn(self.N, self.output_dim).astype(settings.float_type) self.m_linear_set1 = gpflow.models.GPR(X, Y, mean_function=linear_set1, kern=k) self.m_linear_set2 = gpflow.models.GPR(X, Y, mean_function=linear_set2, kern=k) self.m_const_set1 = gpflow.models.GPR(X, Y, mean_function=const_set1, kern=k) self.m_const_set2 = gpflow.models.GPR(X, Y, mean_function=const_set2, kern=k) self.m_linear_min_linear = gpflow.models.GPR( X, Y, mean_function=linear1_minus_linear1, kern=k) self.m_const_min_const = gpflow.models.GPR( X, Y, mean_function=const1_minus_const1, kern=k) self.m_constituent = gpflow.models.GPR(X, Y, mean_function=linear2, kern=k) self.m_zero = gpflow.models.GPR(X, Y, mean_function=zero, kern=k) self.m_comp_minus_constituent1 = gpflow.models.GPR( X, Y, mean_function=comp_minus_constituent1, kern=k) self.m_comp_minus_constituent2 = gpflow.models.GPR( X, Y, mean_function=comp_minus_constituent2, kern=k) def test_precedence(self): with self.test_context(): mu1_lin, v1_lin = self.m_linear_set1.predict_f(self.Xtest) mu2_lin, v2_lin = self.m_linear_set2.predict_f(self.Xtest) mu1_const, v1_const = self.m_const_set1.predict_f(self.Xtest) mu2_const, v2_const = self.m_const_set2.predict_f(self.Xtest) assert_allclose(v1_lin, v1_lin) assert_allclose(mu1_lin, mu2_lin) assert_allclose(v1_const, v2_const) assert_allclose(mu1_const, mu2_const) def test_inverse_operations(self): with self.test_context(): mu1_lin_min_lin, v1_lin_min_lin = self.m_linear_min_linear.predict_f(self.Xtest) mu1_const_min_const, v1_const_min_const = self.m_const_min_const.predict_f(self.Xtest) mu1_comp_min_constituent1, v1_comp_min_constituent1 = self.m_comp_minus_constituent1.predict_f(self.Xtest) mu1_comp_min_constituent2, v1_comp_min_constituent2 = self.m_comp_minus_constituent2.predict_f(self.Xtest) mu_const, _ = self.m_constituent.predict_f(self.Xtest) mu_zero, v_zero = self.m_zero.predict_f(self.Xtest) self.assertTrue(np.all(np.isclose(mu1_lin_min_lin, mu_zero))) self.assertTrue(np.all(np.isclose(mu1_const_min_const, mu_zero))) assert_allclose(mu1_comp_min_constituent1, mu_const) assert_allclose(mu1_comp_min_constituent2, mu_const) assert_allclose(mu1_comp_min_constituent1, mu1_comp_min_constituent2) class TestModelsWithMeanFuncs(GPflowTestCase): """ Simply check that all models have a higher prediction with a constant mean function than with a zero mean function. For compositions of mean functions check that multiplication/ addition of a constant results in a higher prediction, whereas addition of zero/ mutliplication with one does not. """ def setUp(self): self.input_dim = 3 self.output_dim = 2 self.N = 20 self.Ntest = 30 self.M = 5 rng = np.random.RandomState(0) X, Y, Z, self.Xtest = ( rng.randn(self.N, self.input_dim).astype(settings.float_type), rng.randn(self.N, self.output_dim).astype(settings.float_type), rng.randn(self.M, self.input_dim).astype(settings.float_type), rng.randn(self.Ntest, self.input_dim).astype(settings.float_type)) with self.test_context(): k = lambda: gpflow.kernels.Matern32(self.input_dim) lik = lambda: gpflow.likelihoods.Gaussian() mf0 = lambda: gpflow.mean_functions.Zero() mf1 = lambda: gpflow.mean_functions.Constant(np.ones(self.output_dim) * 10) self.models_with, self.models_without = ([ [gpflow.models.GPR(X, Y, mean_function=mf(), kern=k()), gpflow.models.SGPR(X, Y, mean_function=mf(), Z=Z, kern=k()), gpflow.models.GPRFITC(X, Y, mean_function=mf(), Z=Z, kern=k()), gpflow.models.SVGP(X, Y, mean_function=mf(), Z=Z, kern=k(), likelihood=lik()), gpflow.models.VGP(X, Y, mean_function=mf(), kern=k(), likelihood=lik()), gpflow.models.VGP(X, Y, mean_function=mf(), kern=k(), likelihood=lik()), gpflow.models.GPMC(X, Y, mean_function=mf(), kern=k(), likelihood=lik()), gpflow.models.SGPMC(X, Y, mean_function=mf(), kern=k(), likelihood=lik(), Z=Z)] for mf in (mf0, mf1)]) def test_basic_mean_function(self): with self.test_context(): for m_with, m_without in zip(self.models_with, self.models_without): m_with.compile() m_without.compile() mu1, v1 = m_with.predict_f(self.Xtest) mu2, v2 = m_without.predict_f(self.Xtest) self.assertTrue(np.all(v1 == v2)) self.assertFalse(np.all(mu1 == mu2)) class TestSwitchedMeanFunction(GPflowTestCase): """ Test for the SwitchedMeanFunction. """ def test(self): with self.test_context() as sess: rng = np.random.RandomState(0) X = np.hstack([rng.randn(10, 3), 1.0 * rng.randint(0, 2, 10).reshape(-1, 1)]) zeros = gpflow.mean_functions.Constant(np.zeros(1)) ones = gpflow.mean_functions.Constant(np.ones(1)) switched_mean = gpflow.mean_functions.SwitchedMeanFunction([zeros, ones]) switched_mean.compile() result = sess.run(switched_mean(X)) np_list = np.array([0., 1.]) result_ref = (np_list[X[:, 3].astype(np.int)]).reshape(-1, 1) assert_allclose(result, result_ref) class TestBug277Regression(GPflowTestCase): """ See github issue #277. This is a regression test. """ def test(self): with self.test_context(): m1 = gpflow.mean_functions.Linear() m2 = gpflow.mean_functions.Linear() self.assertTrue(m1.b.read_value() == m2.b.read_value()) m1.b = [1.] self.assertFalse(m1.b.read_value() == m2.b.read_value()) if __name__ == "__main__": tf.test.main()