# Copyright 2016 the GPflow authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.from __future__ import print_function from functools import reduce import unittest import GPflow import tensorflow as tf import numpy as np from GPflow import settings float_type = settings.dtypes.float_type np_float_type = np.float32 if float_type is tf.float32 else np.float64 try: import cPickle as pickle except ImportError: import pickle class NamingTests(unittest.TestCase): def test_unnamed(self): p = GPflow.param.Param(1) self.assertTrue(p.name == 'unnamed') def test_bad_parent(self): p = GPflow.param.Param(1) m = GPflow.model.Model() p._parent = m # do not do this. with self.assertRaises(ValueError): print(p.name) class ParamTestsScalar(unittest.TestCase): def setUp(self): tf.reset_default_graph() self.m = GPflow.param.Parameterized() self.m.p = GPflow.param.Param(1.0) def testAssign(self): self.m.p = 2.0 self.assertTrue(isinstance(self.m.p, GPflow.param.Param)) self.assertTrue(self.m.get_free_state() == 2.0) def testValue(self): # make sure the correct value is returned self.m.p = 3.0 self.assertTrue(isinstance(self.m.p.value, np.ndarray)) # make sure assignment does not work with self.assertRaises(AttributeError): self.m.p.value = 2.53 # make sure we get a copy self.assertFalse(self.m.p.value is self.m.p._array) def testReplacement(self): old_p = self.m.p new_p = GPflow.param.Param(3.0) self.m.p = new_p # Parameterized instances should not have _needs_recompile self.assertFalse(hasattr(self.m, '_needs_recompile')) self.assertFalse(old_p.highest_parent is self.m) def testHighestParent(self): self.assertTrue(self.m.p.highest_parent is self.m) def testName(self): self.assertTrue(self.m.p.name == 'p') def testFixing(self): self.m.p.fixed = False self.m.fixed = True self.assertTrue(self.m.p.fixed) self.m.p.fixed = False self.assertFalse(self.m.fixed) def testFixedFreeState(self): self.assertTrue(len(self.m.get_free_state()) == 1) self.m.set_state(np.ones(1)) self.m.fixed = True self.assertTrue(len(self.m.get_free_state()) == 0) self.m.set_state(np.ones(0)) def testMakeTF(self): x = tf.placeholder('float64') l = self.m.make_tf_array(x) self.assertTrue(l == 1) l = self.m.p.make_tf_array(x) self.assertTrue(l == 1) def testFreeState(self): xx = self.m.get_free_state() self.assertTrue(np.allclose(xx, np.ones(1))) y = np.array([34.0], np_float_type) self.m.set_state(y) self.assertTrue(np.allclose(self.m.get_free_state(), y)) def testFixed(self): self.m.p.fixed = True self.assertTrue(len(self.m.get_free_state()) == 0) self.assertTrue(self.m.make_tf_array(tf.placeholder(float_type)) == 0) def testRecompile(self): self.m._needs_recompile = False self.m.p.fixed = True self.assertTrue(self.m._needs_recompile) self.m._needs_recompile = False self.m.p.prior = GPflow.priors.Gaussian(0, 1) self.assertTrue(self.m._needs_recompile) def testTFMode(self): x = tf.placeholder('float64') self.m.make_tf_array(x) self.assertTrue(isinstance(self.m.p, GPflow.param.Param)) with self.m.tf_mode(): self.assertTrue(isinstance(self.m.p, tf.Tensor)) class ParamTestsDeeper(unittest.TestCase): def setUp(self): tf.reset_default_graph() self.m = GPflow.param.Parameterized() self.m.foo = GPflow.param.Parameterized() self.m.foo.bar = GPflow.param.Parameterized() self.m.foo.bar.baz = GPflow.param.Param(1.0) def testHighestParent(self): self.assertTrue(self.m.foo.highest_parent is self.m) self.assertTrue(self.m.foo.bar.highest_parent is self.m) self.assertTrue(self.m.foo.bar.baz.highest_parent is self.m) def testReplacement(self): old_p = self.m.foo.bar.baz new_p = GPflow.param.Param(3.0) self.m.foo.bar.baz = new_p # Parameterized instances should not have _needs_recompile self.assertFalse(hasattr(self.m, '_needs_recompile')) self.assertFalse(old_p.highest_parent is self.m) def testReplacement2(self): old_p = self.m.foo.bar new_p = GPflow.param.Parameterized() new_p.baz = GPflow.param.Param(3.0) self.m.foo.bar = new_p self.assertTrue(new_p.baz.highest_parent is self.m) self.assertFalse(old_p.highest_parent is self.m) def testName(self): self.assertTrue(self.m.foo.name == 'foo') self.assertTrue(self.m.foo.bar.name == 'bar') self.assertTrue(self.m.foo.bar.baz.name == 'baz') def testMakeTF(self): x = tf.placeholder('float64') l = self.m.make_tf_array(x) self.assertTrue(l == 1) l = self.m.foo.make_tf_array(x) self.assertTrue(l == 1) l = self.m.foo.bar.make_tf_array(x) self.assertTrue(l == 1) l = self.m.foo.bar.baz.make_tf_array(x) self.assertTrue(l == 1) def testFreeState(self): xx = self.m.get_free_state() self.assertTrue(np.allclose(xx, np.ones(1))) y = np.array([34.0], np_float_type) self.m.set_state(y) self.assertTrue(np.allclose(self.m.get_free_state(), y)) def testFixed(self): self.m.foo.bar.baz.fixed = True self.assertTrue(len(self.m.get_free_state()) == 0) def testFixing(self): self.m.fixed = False self.m.foo.bar.fixed = True self.assertTrue(self.m.fixed) self.assertTrue(self.m.foo.fixed) self.assertTrue(self.m.foo.bar.fixed) self.assertTrue(self.m.foo.bar.baz.fixed) self.m.foo.bar.baz.fixed = False self.assertFalse(self.m.fixed) self.assertFalse(self.m.foo.fixed) self.assertFalse(self.m.foo.bar.fixed) self.assertFalse(self.m.foo.bar.baz.fixed) def testRecompile(self): self.m._needs_recompile = False self.m.foo.bar.baz.fixed = True self.assertTrue(self.m._needs_recompile) self.m._needs_recompile = False self.m.foo.bar.baz.prior = GPflow.priors.Gaussian(0, 1) self.assertTrue(self.m._needs_recompile) def testTFMode(self): x = tf.placeholder('float64') self.m.make_tf_array(x) self.assertTrue(isinstance(self.m.foo.bar.baz, GPflow.param.Param)) with self.m.tf_mode(): self.assertTrue(isinstance(self.m.foo.bar.baz, tf.Tensor)) class ParamTestsWider(unittest.TestCase): def setUp(self): tf.reset_default_graph() self.m = GPflow.param.Parameterized() self.m.foo = GPflow.param.Param(1.0) self.m.bar = GPflow.param.Param(np.arange(10)) self.m.baz = GPflow.param.Param(np.random.randn(3, 3)) def testHighestParent(self): self.assertTrue(self.m.foo.highest_parent is self.m) self.assertTrue(self.m.bar.highest_parent is self.m) self.assertTrue(self.m.baz.highest_parent is self.m) def testName(self): self.assertTrue(self.m.foo.name == 'foo') self.assertTrue(self.m.bar.name == 'bar') self.assertTrue(self.m.baz.name == 'baz') def testMakeTF(self): x = tf.placeholder('float64') l = self.m.make_tf_array(x) self.assertTrue(l == 20) l = self.m.foo.make_tf_array(x) self.assertTrue(l == 1) l = self.m.bar.make_tf_array(x) self.assertTrue(l == 10) l = self.m.baz.make_tf_array(x) self.assertTrue(l == 9) def testFreeState(self): xx = self.m.get_free_state() self.assertTrue(len(xx) == 20) y = np.random.randn(20) self.m.set_state(y) self.assertTrue(np.allclose(self.m.get_free_state(), y)) def testIndexParam(self): fs = self.m.get_free_state() for p in [self.m.foo, self.m.bar, self.m.baz]: index, found = self.m.get_param_index(p) self.assertTrue(found) self.assertTrue(fs[index] == p.get_free_state()[0]) def testFixed(self): self.m.foo.fixed = True self.assertTrue(len(self.m.get_free_state()) == 19) self.m.foo.fixed = False self.m.bar.fixed = True self.assertTrue(len(self.m.get_free_state()) == 10) def testFixing(self): self.m.fixed = False self.m.foo.fixed = True self.assertFalse(self.m.fixed) self.assertTrue(self.m.foo.fixed) self.assertFalse(self.m.bar.fixed) self.assertFalse(self.m.baz.fixed) self.m.bar.fixed = True self.m.baz.fixed = True self.assertTrue(self.m.fixed) self.assertTrue(self.m.foo.fixed) self.assertTrue(self.m.bar.fixed) self.assertTrue(self.m.baz.fixed) def testRecompile(self): self.m._needs_recompile = False self.m.foo.fixed = True self.assertTrue(self.m._needs_recompile) self.m._needs_recompile = False self.m.bar.prior = GPflow.priors.Gaussian(0, 1) self.assertTrue(self.m._needs_recompile) def testTFMode(self): x = tf.placeholder('float64') self.m.make_tf_array(x) self.assertTrue(all([isinstance(p, GPflow.param.Param) for p in (self.m.foo, self.m.bar, self.m.baz)])) with self.m.tf_mode(): self.assertTrue(all([isinstance(p, tf.Tensor) for p in (self.m.foo, self.m.bar, self.m.baz)])) class SingleParamterizedInvariantTest(unittest.TestCase): """ Tests that invariants of only allowing a single reference to a given Parameterized in a tree """ def setUp(self): tf.reset_default_graph() def testSelfReference(self): """ Test we raise when a Parameterized object references itself """ m = GPflow.param.Parameterized() with self.assertRaises(ValueError): m.foo = m def testReferenceBelow(self): """ Test we raise when we reference the same Parameterized object in a descendent node """ m = GPflow.param.Parameterized() m.foo = GPflow.param.Parameterized() with self.assertRaises(ValueError): m.foo.bar = m def testReferenceAbove(self): """ Test we raise when we reference the same Parameterized object in an ancestor node """ m = GPflow.param.Parameterized() m.foo = GPflow.param.Parameterized() m.foo.bar = GPflow.param.Parameterized() with self.assertRaises(ValueError): m.baz = m.foo.bar def testReferenceAccross(self): """ Test we raise when we reference the same Parameterized object in a sibling node """ m = GPflow.param.Parameterized() m.foo = GPflow.param.Parameterized() m.foo.bar = GPflow.param.Parameterized() m.boo = GPflow.param.Parameterized() with self.assertRaises(ValueError): m.boo.far = m.foo.bar def testAddingToAnother(self): """ Adding the same Paramterized object to another tree is fine. """ m1 = GPflow.param.Parameterized() m1.foo = GPflow.param.Parameterized() m2 = GPflow.param.Parameterized() m2.foo = m1.foo def testReassign(self): """ We should be able to reassign the same value to the same param """ m1 = GPflow.param.Parameterized() p = GPflow.param.Parameterized() m1.foo = p # assign m1.foo = p # reassign class SingleParamInvariantTest(unittest.TestCase): """ Tests that invariants of only allowing a single reference to a given Param in a tree """ def setUp(self): tf.reset_default_graph() def testReferenceBelow(self): """ Test we raise when the same Param object is added further down the tree """ m = GPflow.param.Parameterized() m.p = GPflow.param.Param(1) m.foo = GPflow.param.Parameterized() with self.assertRaises(ValueError): m.foo.p = m.p def testReferenceAbove(self): """ Test we raise when we reference the same Param object in a an ancestor node """ m = GPflow.param.Parameterized() m.foo = GPflow.param.Parameterized() m.foo.p = GPflow.param.Param(1) with self.assertRaises(ValueError): m.p = m.foo.p def testReferenceAccross(self): """ Test we raise when we reference the same Param object in a sibling node """ m = GPflow.param.Parameterized() m.foo = GPflow.param.Parameterized() m.foo.p = GPflow.param.Param(1) m.bar = GPflow.param.Parameterized() with self.assertRaises(ValueError): m.bar.p = m.foo.p def testAddingToAnother(self): """ Adding the same Param object to another tree is fine. """ m1 = GPflow.param.Parameterized() m1.foo = GPflow.param.Param(1) m2 = GPflow.param.Parameterized() m2.foo = m1.foo def testReassign(self): """ We should be able to reassign the same value to the same param """ m1 = GPflow.param.Parameterized() p = GPflow.param.Param(1) m1.foo = p # assign m1.foo = p # reassign class TestParamList(unittest.TestCase): def test_construction(self): GPflow.param.ParamList([]) GPflow.param.ParamList([GPflow.param.Param(1)]) with self.assertRaises(AssertionError): GPflow.param.ParamList([GPflow.param.Param(1), 'stringsnotallowed']) with self.assertRaises(AssertionError): # tuples not valid in constuctor: GPflow.param.ParamList((GPflow.param.Param(1),)) with self.assertRaises(AssertionError): # param objects not valid in constructor (must be in list) GPflow.param.ParamList(GPflow.param.Param(1)) def test_naming(self): p1 = GPflow.param.Param(1.2) p2 = GPflow.param.Param(np.array([3.4, 5.6], np_float_type)) GPflow.param.ParamList([p1, p2]) self.assertTrue(p1.name == 'item0') self.assertTrue(p2.name == 'item1') def test_connected(self): p1 = GPflow.param.Param(1.2) p2 = GPflow.param.Param(np.array([3.4, 5.6], np_float_type)) l = GPflow.param.ParamList([p1, p2]) x = l.get_free_state() x.sort() self.assertTrue(np.all(x == np.array([1.2, 3.4, 5.6], np_float_type))) def test_setitem(self): p1 = GPflow.param.Param(1.2) p2 = GPflow.param.Param(np.array([3.4, 5.6], np_float_type)) l = GPflow.param.ParamList([p1, p2]) l[0] = 1.2 self.assertTrue(p1._array == 1.2) l[1] = np.array([1.1, 2.2], np_float_type) self.assertTrue(np.all(p2._array == np.array([1.1, 2.2], np_float_type))) with self.assertRaises(TypeError): l[0] = GPflow.param.Param(12) def test_append(self): p1 = GPflow.param.Param(1.2) p2 = GPflow.param.Param(np.array([3.4, 5.6], np_float_type)) l = GPflow.param.ParamList([p1]) l.append(p2) self.assertTrue(p2 in l.sorted_params) with self.assertRaises(AssertionError): l.append('foo') def test_len(self): p1 = GPflow.param.Param(1.2) p2 = GPflow.param.Param(np.array([3.4, 5.6], np_float_type)) l = GPflow.param.ParamList([p1]) l.append(p2) self.assertTrue(len(l) == 2) def test_with_parameterized(self): pzd = GPflow.param.Parameterized() p = GPflow.param.Param(1.2) pzd.p = p l = GPflow.param.ParamList([pzd]) # test assignment: l[0].p = 5 self.assertTrue(l.get_free_state() == 5) # test to make sure tf_mode get turned on and off self.assertFalse(pzd._tf_mode) with l.tf_mode(): self.assertTrue(pzd._tf_mode) self.assertFalse(pzd._tf_mode) def test_in_model(self): class Foo(GPflow.model.Model): def __init__(self): GPflow.model.Model.__init__(self) self.l = GPflow.param.ParamList([ GPflow.param.Param(1), GPflow.param.Param(12)]) def build_likelihood(self): return -reduce(tf.add, [tf.square(x) for x in self.l]) m = Foo() self.assertTrue(m.get_free_state().size == 2) m.optimize(disp=False) atol = 1e-6 if np_float_type is np.float32 else 1e-8 self.assertTrue(np.allclose(m.get_free_state(), 0., atol=atol)) class TestPickleAndDict(unittest.TestCase): def setUp(self): rng = np.random.RandomState(0) X = rng.randn(10, 1) Y = rng.randn(10, 1) self.m = GPflow.gpr.GPR(X, Y, kern=GPflow.kernels.RBF(1)) def test(self): # pickle and reload the model s1 = pickle.dumps(self.m) m1 = pickle.loads(s1) d1 = self.m.get_parameter_dict() d2 = m1.get_parameter_dict() for key, val in d1.items(): assert np.all(val == d2[key]) class TestDictEmpty(unittest.TestCase): def setUp(self): self.m = GPflow.model.Model() def test(self): d = self.m.get_parameter_dict() self.assertTrue(len(d.keys()) == 0) self.m.set_parameter_dict(d) class TestDictSimple(unittest.TestCase): def setUp(self): self.m = GPflow.model.Model() self.m.p1 = GPflow.param.Param(np.random.randn(3, 2)) self.m.p2 = GPflow.param.Param(np.random.randn(10)) def test(self): d = self.m.get_parameter_dict() self.assertTrue(len(d.keys()) == 2) state1 = self.m.get_free_state().copy() self.m.set_state(state1 * 0) self.m.set_parameter_dict(d) self.assertTrue(np.all(state1 == self.m.get_free_state())) class TestDictSVGP(unittest.TestCase): def setUp(self): self.rng = np.random.RandomState(0) X = self.rng.randn(10, 1) Y = self.rng.randn(10, 1) Z = self.rng.randn(5, 1) self.m = GPflow.svgp.SVGP(X, Y, Z=Z, likelihood=GPflow.likelihoods.Gaussian(), kern=GPflow.kernels.RBF(1)) def test(self): loglik1 = self.m.compute_log_likelihood() d = self.m.get_parameter_dict() # muck up the model self.m.set_state(self.rng.randn(self.m.get_free_state().size)) loglik2 = self.m.compute_log_likelihood() # reset the model self.m.set_parameter_dict(d) loglik3 = self.m.compute_log_likelihood() self.assertFalse(np.allclose(loglik1, loglik2)) self.assertTrue(np.allclose(loglik1, loglik3)) class TestFixWithPrior(unittest.TestCase): """ This tests that models with a fixed parameter which has a prior continue to work """ def test(self): m = GPflow.model.Model() m.p = GPflow.param.Param(1.0, GPflow.transforms.positive) m.pp = GPflow.param.Param(1.0, GPflow.transforms.positive) m.p.prior = GPflow.priors.Gamma(1, 1) m.pp.prior = GPflow.priors.Gamma(1, 1) m.p.fixed = True m.build_likelihood = lambda: tf.zeros([1], tf.float64) m.optimize(disp=1, maxiter=10) class TestRandomizeDefault(unittest.TestCase): """ This tests that distributions can sample random values without priors """ def test(self): np.random.seed(1) m = GPflow.model.Model() m.p = GPflow.param.Param(1.0) m.pp = GPflow.param.Param(1.0, GPflow.transforms.Log1pe()) m.pf = GPflow.param.Param(1.0) m.pf.fixed = True m.pmd = GPflow.param.Param(np.ones((5, 2))) ltr = GPflow.transforms.LowerTriangular(1,2).forward(np.ones(2 * 10)) m.pmd2 = GPflow.param.Param(ltr, transform=GPflow.transforms.LowerTriangular(1,2)) #should work as (pseudo) random vals a.s. are not 1.0 m.p.randomize() self.assertFalse(m.p.value == 1.0) m.pp.randomize() self.assertFalse(m.pp.value == 1.0 or m.pp.value <= 0.0) #check if fixing works m.pf.randomize() self.assertTrue(m.pf.value == 1.0) m.pf.randomize(skipfixed=False) self.assertFalse(m.pf.value == 1.0) #check multidimensional pmd_shape = m.pmd.shape m.pmd.randomize() self.assertFalse(np.any(m.pmd.value == 1.0)) self.assertEquals(m.pmd.shape, pmd_shape) #check non size-preserving transform pmd2_shape = m.pmd2.shape m.pmd2.randomize() self.assertFalse(np.any(m.pmd2.value == 1.0)) self.assertEquals(m.pmd2.shape, pmd2_shape) class TestRandomizePrior(unittest.TestCase): """ This tests that distributions can sample random values from priors """ def test(self): np.random.seed(1) from inspect import getargspec m = GPflow.model.Model() m.p = GPflow.param.Param(1.0) m.pmd = GPflow.param.Param(np.eye(5), transform=GPflow.transforms.DiagMatrix()) priors = [obj for obj in GPflow.priors.__dict__.values() if isinstance(obj, type) and issubclass(obj, GPflow.priors.Prior) and obj is not GPflow.priors.Prior] with self.assertRaises(NotImplementedError): m.p = 1.0 m.p.prior = GPflow.priors.Prior() m.p.randomize() for prior in priors: signature = getargspec(prior.__init__) params = {} if signature.defaults is not None: param_names = signature.args[:-len(signature.defaults)] else: param_names = signature.args for param in param_names: if param not in params.keys() and param is not 'self': params[param] = 1. m.p = 1.0 m.p.prior = prior(**params) m.pmd.prior = prior(**params) m.p.randomize() m.pmd.randomize() self.assertFalse(m.p.value == 1.0) self.assertFalse(np.any(m.pmd.value == np.ones(5))) self.assertTrue(m.pmd.value.shape == (5,5)) class TestRandomizeFeedPriors(unittest.TestCase): """ Test if standard randomize behavior can be overriden using distributions keyword. """ def test(self): np.random.seed(1) m = GPflow.model.Model() m.p = GPflow.param.Param(1.0) with self.assertRaises(NotImplementedError): m.p.randomize(distributions={m.p: GPflow.priors.Prior()}) m.p.randomize(distributions={m.p: GPflow.priors.Gaussian(0, 1)}) self.assertFalse(m.p.value == 1.0) class TestRandomizeHierarchical(unittest.TestCase): """ This tests that models can randomize all contained parameters """ def test(self): np.random.seed(1) m = GPflow.model.Model() m.p = GPflow.param.Param(1.0) m.p2 = GPflow.param.Param(1.0) m.m = GPflow.model.Model() m.m.p = GPflow.param.Param(1.0) m.m.p2 = GPflow.param.Param(1.0) m.p2.prior = GPflow.priors.Gaussian(0, 1) m.m.p2.prior = GPflow.priors.Gaussian(0, 1) m.randomize() self.assertFalse(m.p.value == 1.0) self.assertFalse(m.p2.value == 1.0) self.assertFalse(m.m.p.value == 1.0) self.assertFalse(m.m.p2.value == 1.0) class TestScopes(unittest.TestCase): def setUp(self): rng = np.random.RandomState(0) X = rng.randn(10, 1) k = GPflow.kernels.RBF(1) Y = rng.randn(10, 1) self.m = GPflow.gpr.GPR(X, Y, k) self.m._compile() def test_likelihood_name(self): with self.m.tf_mode(): with self.m._graph.as_default(): l = self.m.build_likelihood() expected_name = self.m.name + '.build_likelihood' self.assertTrue(expected_name in l.name) def test_kern_name(self): with self.m.tf_mode(): with self.m._graph.as_default(): K = self.m.kern.K(self.m.X) self.assertTrue('kern.K' in K.name) if __name__ == "__main__": unittest.main()