https://github.com/GPflow/GPflow
Raw File
Tip revision: f4ce06708816199b1926b627322181b74d7a75eb authored by Alexander G. de G. Matthews on 30 August 2017, 11:28:47 UTC
Merge pull request #496 from GPflow/artemav/release-update
Tip revision: f4ce067
test_param.py
# Copyright 2016 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.from __future__ import print_function

from functools import reduce
import unittest
import gpflow
import tensorflow as tf
import numpy as np

from testing.gpflow_testcase import GPflowTestCase
from gpflow import settings


float_type = settings.dtypes.float_type
np_float_type = np.float32 if float_type is tf.float32 else np.float64

try:
    import cPickle as pickle
except ImportError:
    import pickle


class NamingTests(GPflowTestCase):
    def test_unnamed(self):
        p = gpflow.param.Param(1)
        self.assertTrue(p.name == 'unnamed')

    def test_bad_parent(self):
        p = gpflow.param.Param(1)
        m = gpflow.model.Model()
        p._parent = m  # do not do this.
        with self.assertRaises(ValueError):
            print(p.name)


class ParamTestsScalar(GPflowTestCase):
    def setUp(self):
        self.m = gpflow.param.Parameterized()
        self.m.p = gpflow.param.Param(1.0)

    def testAssign(self):
        with self.test_session():
            self.m.p = 2.0
            self.assertTrue(isinstance(self.m.p, gpflow.param.Param))
            self.assertTrue(self.m.get_free_state() == 2.0)

    def testValue(self):
        # make sure the correct value is returned
        self.m.p = 3.0
        self.assertTrue(isinstance(self.m.p.value, np.ndarray))

        # make sure assignment does not work

        with self.assertRaises(AttributeError):
            self.m.p.value = 2.53

        # make sure we get a copy
        self.assertFalse(self.m.p.value is self.m.p._array)

    def testReplacement(self):
        old_p = self.m.p
        new_p = gpflow.param.Param(3.0)
        self.m.p = new_p
        # Parameterized instances should not have _needs_recompile
        self.assertFalse(hasattr(self.m, '_needs_recompile'))
        self.assertFalse(old_p.highest_parent is self.m)

    def testHighestParent(self):
        self.assertTrue(self.m.p.highest_parent is self.m)

    def testName(self):
        self.assertTrue(self.m.p.name == 'p')

    def testFixing(self):
        self.m.p.fixed = False
        self.m.fixed = True
        self.assertTrue(self.m.p.fixed)
        self.m.p.fixed = False
        self.assertFalse(self.m.fixed)

    def testFixedFreeState(self):
        with self.test_session():
            self.assertTrue(len(self.m.get_free_state()) == 1)
            self.m.set_state(np.ones(1))
            self.m.fixed = True
            self.assertTrue(len(self.m.get_free_state()) == 0)
            self.m.set_state(np.ones(0))

    def testMakeTF(self):
        with self.test_session():
            x = tf.placeholder('float64')

            l = self.m.make_tf_array(x)
            self.assertTrue(l == 1)

            l = self.m.p.make_tf_array(x)
            self.assertTrue(l == 1)

    def testFreeState(self):
        with self.test_session():
            xx = self.m.get_free_state()
            self.assertTrue(np.allclose(xx, np.ones(1)))

            y = np.array([34.0], np_float_type)
            self.m.set_state(y)
            self.assertTrue(np.allclose(self.m.get_free_state(), y))

    def testFixed(self):
        with self.test_session():
            self.m.p.fixed = True
            self.assertTrue(len(self.m.get_free_state()) == 0)
            self.assertTrue(self.m.make_tf_array(tf.placeholder(float_type)) == 0)

    def testRecompile(self):
        with self.test_session():
            self.m._needs_recompile = False
            self.m.p.fixed = True
            self.assertTrue(self.m._needs_recompile)

            self.m._needs_recompile = False
            self.m.p.prior = gpflow.priors.Gaussian(0, 1)
            self.assertTrue(self.m._needs_recompile)

    def testTFMode(self):
        with self.test_session():
            x = tf.placeholder('float64')
            self.m.make_tf_array(x)
            self.assertTrue(isinstance(self.m.p, gpflow.param.Param))
            with self.m.tf_mode():
                self.assertTrue(isinstance(self.m.p, tf.Tensor))


class ParamTestsDeeper(GPflowTestCase):
    def setUp(self):
        with self.test_session():
            self.m = gpflow.param.Parameterized()
            self.m.foo = gpflow.param.Parameterized()
            self.m.foo.bar = gpflow.param.Parameterized()
            self.m.foo.bar.baz = gpflow.param.Param(1.0)

    def testHighestParent(self):
        self.assertTrue(self.m.foo.highest_parent is self.m)
        self.assertTrue(self.m.foo.bar.highest_parent is self.m)
        self.assertTrue(self.m.foo.bar.baz.highest_parent is self.m)

    def testReplacement(self):
        old_p = self.m.foo.bar.baz
        new_p = gpflow.param.Param(3.0)
        self.m.foo.bar.baz = new_p
        # Parameterized instances should not have _needs_recompile
        self.assertFalse(hasattr(self.m, '_needs_recompile'))
        self.assertFalse(old_p.highest_parent is self.m)

    def testReplacement2(self):
        old_p = self.m.foo.bar
        new_p = gpflow.param.Parameterized()
        new_p.baz = gpflow.param.Param(3.0)
        self.m.foo.bar = new_p
        self.assertTrue(new_p.baz.highest_parent is self.m)
        self.assertFalse(old_p.highest_parent is self.m)

    def testName(self):
        self.assertTrue(self.m.foo.name == 'foo')
        self.assertTrue(self.m.foo.bar.name == 'bar')
        self.assertTrue(self.m.foo.bar.baz.name == 'baz')

    def testMakeTF(self):
        with self.test_session():
            x = tf.placeholder('float64')

            l = self.m.make_tf_array(x)
            self.assertTrue(l == 1)

            l = self.m.foo.make_tf_array(x)
            self.assertTrue(l == 1)

            l = self.m.foo.bar.make_tf_array(x)
            self.assertTrue(l == 1)

            l = self.m.foo.bar.baz.make_tf_array(x)
            self.assertTrue(l == 1)

    def testFreeState(self):
        with self.test_session():
            xx = self.m.get_free_state()
            self.assertTrue(np.allclose(xx, np.ones(1)))

            y = np.array([34.0], np_float_type)
            self.m.set_state(y)
            self.assertTrue(np.allclose(self.m.get_free_state(), y))

    def testFixed(self):
        self.m.foo.bar.baz.fixed = True
        self.assertTrue(len(self.m.get_free_state()) == 0)

    def testFixing(self):
        self.m.fixed = False
        self.m.foo.bar.fixed = True
        self.assertTrue(self.m.fixed)
        self.assertTrue(self.m.foo.fixed)
        self.assertTrue(self.m.foo.bar.fixed)
        self.assertTrue(self.m.foo.bar.baz.fixed)
        self.m.foo.bar.baz.fixed = False
        self.assertFalse(self.m.fixed)
        self.assertFalse(self.m.foo.fixed)
        self.assertFalse(self.m.foo.bar.fixed)
        self.assertFalse(self.m.foo.bar.baz.fixed)

    def testRecompile(self):
        with self.test_session():
            self.m._needs_recompile = False
            self.m.foo.bar.baz.fixed = True
            self.assertTrue(self.m._needs_recompile)

            self.m._needs_recompile = False
            self.m.foo.bar.baz.prior = gpflow.priors.Gaussian(0, 1)
            self.assertTrue(self.m._needs_recompile)

    def testTFMode(self):
        with self.test_session():
            x = tf.placeholder('float64')

            self.m.make_tf_array(x)
            self.assertTrue(isinstance(self.m.foo.bar.baz, gpflow.param.Param))
            with self.m.tf_mode():
                self.assertTrue(isinstance(self.m.foo.bar.baz, tf.Tensor))


class ParamTestsWider(GPflowTestCase):
    def setUp(self):
        self.m = gpflow.param.Parameterized()
        self.m.foo = gpflow.param.Param(1.0)
        self.m.bar = gpflow.param.Param(np.arange(10))
        self.m.baz = gpflow.param.Param(np.random.randn(3, 3))

    def testHighestParent(self):
        self.assertTrue(self.m.foo.highest_parent is self.m)
        self.assertTrue(self.m.bar.highest_parent is self.m)
        self.assertTrue(self.m.baz.highest_parent is self.m)

    def testName(self):
        self.assertTrue(self.m.foo.name == 'foo')
        self.assertTrue(self.m.bar.name == 'bar')
        self.assertTrue(self.m.baz.name == 'baz')

    def testMakeTF(self):
        with self.test_session():
            x = tf.placeholder('float64')

            l = self.m.make_tf_array(x)
            self.assertTrue(l == 20)

            l = self.m.foo.make_tf_array(x)
            self.assertTrue(l == 1)

            l = self.m.bar.make_tf_array(x)
            self.assertTrue(l == 10)

            l = self.m.baz.make_tf_array(x)
            self.assertTrue(l == 9)

    def testFreeState(self):
        with self.test_session():
            xx = self.m.get_free_state()
            self.assertTrue(len(xx) == 20)

            y = np.random.randn(20)
            self.m.set_state(y)

            self.assertTrue(np.allclose(self.m.get_free_state(), y))

    def testIndexParam(self):
        with self.test_session():
            fs = self.m.get_free_state()
            for p in [self.m.foo, self.m.bar, self.m.baz]:
                index, found = self.m.get_param_index(p)
                self.assertTrue(found)
                self.assertTrue(fs[index] == p.get_free_state()[0])

    def testFixed(self):
        with self.test_session():
            self.m.foo.fixed = True
            self.assertTrue(len(self.m.get_free_state()) == 19)

            self.m.foo.fixed = False
            self.m.bar.fixed = True
            self.assertTrue(len(self.m.get_free_state()) == 10)

    def testFixing(self):
        self.m.fixed = False
        self.m.foo.fixed = True
        self.assertFalse(self.m.fixed)
        self.assertTrue(self.m.foo.fixed)
        self.assertFalse(self.m.bar.fixed)
        self.assertFalse(self.m.baz.fixed)
        self.m.bar.fixed = True
        self.m.baz.fixed = True
        self.assertTrue(self.m.fixed)
        self.assertTrue(self.m.foo.fixed)
        self.assertTrue(self.m.bar.fixed)
        self.assertTrue(self.m.baz.fixed)

    def testRecompile(self):
        with self.test_session():
            self.m._needs_recompile = False
            self.m.foo.fixed = True
            self.assertTrue(self.m._needs_recompile)

            self.m._needs_recompile = False
            self.m.bar.prior = gpflow.priors.Gaussian(0, 1)
            self.assertTrue(self.m._needs_recompile)

    def testTFMode(self):
        with self.test_session():
            x = tf.placeholder('float64')
            self.m.make_tf_array(x)
            self.assertTrue(all([isinstance(p, gpflow.param.Param) for p in (self.m.foo, self.m.bar, self.m.baz)]))
            with self.m.tf_mode():
                self.assertTrue(all([isinstance(p, tf.Tensor)
                                     for p in (self.m.foo, self.m.bar, self.m.baz)]))


class SingleParamterizedInvariantTest(GPflowTestCase):
    """
    Tests that invariants of only allowing a single reference to a given Parameterized in a tree
    """
    def testSelfReference(self):
        """
        Test we raise when a Parameterized object references itself
        """
        m = gpflow.param.Parameterized()

        with self.assertRaises(ValueError):
            m.foo = m

    def testReferenceBelow(self):
        """
        Test we raise when we reference the same Parameterized object in a descendent node
        """
        m = gpflow.param.Parameterized()
        m.foo = gpflow.param.Parameterized()

        with self.assertRaises(ValueError):
            m.foo.bar = m

    def testReferenceAbove(self):
        """
        Test we raise when we reference the same Parameterized object in an ancestor node
        """
        m = gpflow.param.Parameterized()
        m.foo = gpflow.param.Parameterized()
        m.foo.bar = gpflow.param.Parameterized()

        with self.assertRaises(ValueError):
            m.baz = m.foo.bar

    def testReferenceAccross(self):
        """
        Test we raise when we reference the same Parameterized object in a sibling node
        """
        m = gpflow.param.Parameterized()
        m.foo = gpflow.param.Parameterized()
        m.foo.bar = gpflow.param.Parameterized()

        m.boo = gpflow.param.Parameterized()

        with self.assertRaises(ValueError):
            m.boo.far = m.foo.bar

    def testAddingToAnother(self):
        """
        Adding the same Paramterized object to another tree is fine.
        """
        m1 = gpflow.param.Parameterized()
        m1.foo = gpflow.param.Parameterized()

        m2 = gpflow.param.Parameterized()
        m2.foo = m1.foo

    def testReassign(self):
        """
        We should be able to reassign the same value to the same param
        """
        m1 = gpflow.param.Parameterized()
        p = gpflow.param.Parameterized()
        m1.foo = p  # assign
        m1.foo = p  # reassign


class SingleParamInvariantTest(GPflowTestCase):
    """
    Tests that invariants of only allowing a single reference to a given Param in a tree
    """
    def testReferenceBelow(self):
        """
        Test we raise when the same Param object is added further down the tree
        """
        m = gpflow.param.Parameterized()
        m.p = gpflow.param.Param(1)
        m.foo = gpflow.param.Parameterized()

        with self.assertRaises(ValueError):
            m.foo.p = m.p

    def testReferenceAbove(self):
        """
        Test we raise when we reference the same Param object in a an ancestor node
        """
        m = gpflow.param.Parameterized()
        m.foo = gpflow.param.Parameterized()
        m.foo.p = gpflow.param.Param(1)

        with self.assertRaises(ValueError):
            m.p = m.foo.p

    def testReferenceAccross(self):
        """
        Test we raise when we reference the same Param object in a sibling node
        """
        m = gpflow.param.Parameterized()
        m.foo = gpflow.param.Parameterized()
        m.foo.p = gpflow.param.Param(1)

        m.bar = gpflow.param.Parameterized()

        with self.assertRaises(ValueError):
            m.bar.p = m.foo.p

    def testAddingToAnother(self):
        """
        Adding the same Param object to another tree is fine.
        """
        m1 = gpflow.param.Parameterized()
        m1.foo = gpflow.param.Param(1)

        m2 = gpflow.param.Parameterized()
        m2.foo = m1.foo

    def testReassign(self):
        """
        We should be able to reassign the same value to the same param
        """
        m1 = gpflow.param.Parameterized()
        p = gpflow.param.Param(1)
        m1.foo = p  # assign
        m1.foo = p  # reassign


class TestParamList(GPflowTestCase):
    def test_construction(self):
        gpflow.param.ParamList([])
        gpflow.param.ParamList([gpflow.param.Param(1)])
        with self.assertRaises(AssertionError):
            gpflow.param.ParamList([gpflow.param.Param(1), 'stringsnotallowed'])
        with self.assertRaises(AssertionError):
            # tuples not valid in constuctor:
            gpflow.param.ParamList((gpflow.param.Param(1),))
        with self.assertRaises(AssertionError):
            # param objects not valid in constructor (must be in list)
            gpflow.param.ParamList(gpflow.param.Param(1))

    def test_naming(self):
        p1 = gpflow.param.Param(1.2)
        p2 = gpflow.param.Param(np.array([3.4, 5.6], np_float_type))
        gpflow.param.ParamList([p1, p2])
        self.assertTrue(p1.name == 'item0')
        self.assertTrue(p2.name == 'item1')

    def test_connected(self):
        with self.test_session():
            p1 = gpflow.param.Param(1.2)
            p2 = gpflow.param.Param(np.array([3.4, 5.6], np_float_type))
            l = gpflow.param.ParamList([p1, p2])
            x = l.get_free_state()
            x.sort()
            self.assertTrue(np.all(x == np.array([1.2, 3.4, 5.6], np_float_type)))

    def test_setitem(self):
        p1 = gpflow.param.Param(1.2)
        p2 = gpflow.param.Param(np.array([3.4, 5.6], np_float_type))
        l = gpflow.param.ParamList([p1, p2])

        l[0] = 1.2
        self.assertTrue(p1._array == 1.2)

        l[1] = np.array([1.1, 2.2], np_float_type)
        self.assertTrue(np.all(p2._array == np.array([1.1, 2.2], np_float_type)))

        with self.assertRaises(TypeError):
            l[0] = gpflow.param.Param(12)

    def test_append(self):
        p1 = gpflow.param.Param(1.2)
        p2 = gpflow.param.Param(np.array([3.4, 5.6], np_float_type))
        l = gpflow.param.ParamList([p1])
        l.append(p2)
        self.assertTrue(p2 in l.sorted_params)
        with self.assertRaises(AssertionError):
            l.append('foo')

    def test_len(self):
        p1 = gpflow.param.Param(1.2)
        p2 = gpflow.param.Param(np.array([3.4, 5.6], np_float_type))
        l = gpflow.param.ParamList([p1])
        l.append(p2)
        self.assertTrue(len(l) == 2)

    def test_with_parameterized(self):
        with self.test_session():
            pzd = gpflow.param.Parameterized()
            p = gpflow.param.Param(1.2)
            pzd.p = p
            l = gpflow.param.ParamList([pzd])

            # test assignment:
            l[0].p = 5
            self.assertTrue(l.get_free_state() == 5)

            # test to make sure tf_mode get turned on and off
            self.assertFalse(pzd._tf_mode)
            with l.tf_mode():
                self.assertTrue(pzd._tf_mode)
            self.assertFalse(pzd._tf_mode)

    def test_in_model(self):
        class Foo(gpflow.model.Model):
            def __init__(self):
                gpflow.model.Model.__init__(self)
                self.l = gpflow.param.ParamList([
                    gpflow.param.Param(1), gpflow.param.Param(12)])

            def build_likelihood(self):
                return -reduce(tf.add, [tf.square(x) for x in self.l])

        with self.test_session():
            m = Foo()
            self.assertTrue(m.get_free_state().size == 2)
            m.optimize(disp=False)
            atol = 1e-6 if np_float_type is np.float32 else 1e-8
            self.assertTrue(np.allclose(m.get_free_state(), 0., atol=atol))


class TestPickleAndDict(GPflowTestCase):
    def setUp(self):
        with self.test_session():
            rng = np.random.RandomState(0)
            X = rng.randn(10, 1)
            Y = rng.randn(10, 1)
            self.m = gpflow.gpr.GPR(X, Y, kern=gpflow.kernels.RBF(1))

    def test(self):
        # pickle and reload the model
        s1 = pickle.dumps(self.m)
        m1 = pickle.loads(s1)
        d1 = self.m.get_parameter_dict()
        d2 = m1.get_parameter_dict()
        for key, val in d1.items():
            assert np.all(val == d2[key])


class TestDictEmpty(GPflowTestCase):
    def setUp(self):
        self.m = gpflow.model.Model()

    def test(self):
        d = self.m.get_parameter_dict()
        self.assertTrue(len(d.keys()) == 0)
        self.m.set_parameter_dict(d)


class TestDictSimple(GPflowTestCase):
    def setUp(self):
        self.m = gpflow.model.Model()
        self.m.p1 = gpflow.param.Param(np.random.randn(3, 2))
        self.m.p2 = gpflow.param.Param(np.random.randn(10))

    def test(self):
        d = self.m.get_parameter_dict()
        self.assertTrue(len(d.keys()) == 2)
        state1 = self.m.get_free_state().copy()
        self.m.set_state(state1 * 0)
        self.m.set_parameter_dict(d)
        self.assertTrue(np.all(state1 == self.m.get_free_state()))


class TestDictSVGP(GPflowTestCase):
    def setUp(self):
        self.rng = np.random.RandomState(0)
        X = self.rng.randn(10, 1)
        Y = self.rng.randn(10, 1)
        Z = self.rng.randn(5, 1)
        self.m = gpflow.svgp.SVGP(
            X, Y, Z=Z,
            likelihood=gpflow.likelihoods.Gaussian(),
            kern=gpflow.kernels.RBF(1))

    def test(self):
        loglik1 = self.m.compute_log_likelihood()
        d = self.m.get_parameter_dict()

        # muck up the model
        self.m.set_state(self.rng.randn(self.m.get_free_state().size))
        loglik2 = self.m.compute_log_likelihood()

        # reset the model
        self.m.set_parameter_dict(d)
        loglik3 = self.m.compute_log_likelihood()

        self.assertFalse(np.allclose(loglik1, loglik2))
        self.assertTrue(np.allclose(loglik1, loglik3))


class TestFixWithPrior(GPflowTestCase):
    """
    This tests that models with a fixed parameter which has a prior continue to work
    """

    def test(self):
        with self.test_session():
            m = gpflow.model.Model()
            m.p = gpflow.param.Param(1.0, gpflow.transforms.positive)
            m.pp = gpflow.param.Param(1.0, gpflow.transforms.positive)
            m.p.prior = gpflow.priors.Gamma(1, 1)
            m.pp.prior = gpflow.priors.Gamma(1, 1)
            m.p.fixed = True
            m.build_likelihood = lambda: tf.zeros([1], tf.float64)
            m.optimize(disp=1, maxiter=10)

class TestRandomizeDefault(GPflowTestCase):
    """
    This tests that distributions can sample random values without priors
    """

    def test(self):
        with self.test_session():
            np.random.seed(1)
            m = gpflow.model.Model()
            m.p = gpflow.param.Param(1.0)
            m.pp = gpflow.param.Param(1.0, gpflow.transforms.Log1pe())
            m.pf = gpflow.param.Param(1.0)
            m.pf.fixed = True

            m.pmd = gpflow.param.Param(np.ones((5, 2)))
            ltr = gpflow.transforms.LowerTriangular(1,2).forward(np.ones(2 * 10))
            m.pmd2 = gpflow.param.Param(
                ltr, transform=gpflow.transforms.LowerTriangular(1,2))

            #should work as (pseudo) random vals a.s. are not 1.0
            m.p.randomize()
            self.assertFalse(m.p.value == 1.0)
            m.pp.randomize()
            self.assertFalse(m.pp.value == 1.0 or m.pp.value <= 0.0)

            #check if fixing works
            m.pf.randomize()
            self.assertTrue(m.pf.value == 1.0)
            m.pf.randomize(skipfixed=False)
            self.assertFalse(m.pf.value == 1.0)

            #check multidimensional
            pmd_shape = m.pmd.shape
            m.pmd.randomize()
            self.assertFalse(np.any(m.pmd.value == 1.0))
            self.assertEquals(m.pmd.shape, pmd_shape)

            #check non size-preserving transform
            pmd2_shape = m.pmd2.shape
            m.pmd2.randomize()
            self.assertFalse(np.any(m.pmd2.value == 1.0))
            self.assertEquals(m.pmd2.shape, pmd2_shape)

class TestRandomizePrior(GPflowTestCase):
    """
    This tests that distributions can sample random values from priors
    """

    def test(self):
        with self.test_session():
            np.random.seed(1)
            from inspect import getargspec

            m = gpflow.model.Model()
            m.p = gpflow.param.Param(1.0)
            m.pmd = gpflow.param.Param(
                np.eye(5), transform=gpflow.transforms.DiagMatrix())

            priors = [obj for obj in gpflow.priors.__dict__.values() if
                      isinstance(obj, type) and
                      issubclass(obj, gpflow.priors.Prior) and
                      obj is not gpflow.priors.Prior]

            with self.assertRaises(NotImplementedError):
                m.p = 1.0
                m.p.prior = gpflow.priors.Prior()
                m.p.randomize()

            for prior in priors:
                signature = getargspec(prior.__init__)
                params = {}
                if signature.defaults is not None:
                    param_names = signature.args[:-len(signature.defaults)]
                else:
                    param_names = signature.args
                for param in param_names:
                    if param not in params.keys() and param is not 'self':
                        params[param] = 1.

                m.p = 1.0
                m.p.prior = prior(**params)
                m.pmd.prior = prior(**params)
                m.p.randomize()
                m.pmd.randomize()
                self.assertFalse(m.p.value == 1.0)
                self.assertFalse(np.any(m.pmd.value == np.ones(5)))
                self.assertTrue(m.pmd.value.shape == (5,5))


class TestRandomizeFeedPriors(GPflowTestCase):
    """
    Test if standard randomize behavior can be overriden using
    distributions keyword.
    """

    def test(self):
        with self.test_session():
            np.random.seed(1)
            m = gpflow.model.Model()
            m.p = gpflow.param.Param(1.0)
            with self.assertRaises(NotImplementedError):
                m.p.randomize(distributions={m.p: gpflow.priors.Prior()})
            m.p.randomize(distributions={m.p: gpflow.priors.Gaussian(0, 1)})
            self.assertFalse(m.p.value == 1.0)


class TestRandomizeHierarchical(GPflowTestCase):
    """
    This tests that models can randomize all contained parameters
    """

    def test(self):
        with self.test_session():
            np.random.seed(1)
            m = gpflow.model.Model()
            m.p = gpflow.param.Param(1.0)
            m.p2 = gpflow.param.Param(1.0)
            m.m = gpflow.model.Model()
            m.m.p = gpflow.param.Param(1.0)
            m.m.p2 = gpflow.param.Param(1.0)

            m.p2.prior = gpflow.priors.Gaussian(0, 1)
            m.m.p2.prior = gpflow.priors.Gaussian(0, 1)
            m.randomize()

            self.assertFalse(m.p.value == 1.0)
            self.assertFalse(m.p2.value == 1.0)
            self.assertFalse(m.m.p.value == 1.0)
            self.assertFalse(m.m.p2.value == 1.0)


class TestScopes(GPflowTestCase):
    def setUp(self):
        with self.test_session():
            rng = np.random.RandomState(0)
            X = rng.randn(10, 1)
            k = gpflow.kernels.RBF(1)
            Y = rng.randn(10, 1)
            self.m = gpflow.gpr.GPR(X, Y, k)
            self.m.compile()

    def test_likelihood_name(self):
        with self.test_session():
            with self.m.tf_mode():
                l = self.m.build_likelihood()
            expected_name = self.m.name + '.build_likelihood'
            self.assertTrue(expected_name in l.name)

    def test_kern_name(self):
        with self.test_session():
            with self.m.tf_mode():
                K = self.m.kern.K(self.m.X)
            self.assertTrue('kern.K' in K.name)


if __name__ == "__main__":
    unittest.main()
back to top