https://github.com/GPflow/GPflow
Raw File
Tip revision: a382def2e4e25861c500974f6168b2bb4fa9bf94 authored by Artem Artemev on 11 November 2017, 21:54:43 UTC
Merge pull request #547 from GPflow/GPflow-1.0-RC
Tip revision: a382def
test_model.py
# Copyright 2016 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.from __future__ import print_function

import tensorflow as tf

import numpy as np
from numpy.testing import assert_almost_equal

import gpflow
from gpflow.test_util import GPflowTestCase


class Quadratic(gpflow.models.Model):
    def __init__(self):
        rng = np.random.RandomState(0)
        gpflow.models.Model.__init__(self)
        self.x = gpflow.Param(rng.randn(10))

    @gpflow.params_as_tensors
    def _build_likelihood(self):
        return tf.negative(tf.reduce_sum(tf.square(self.x)))


class TestOptimize(GPflowTestCase):
    def test_adam(self):
        with self.test_context():
            m = Quadratic()
            opt = gpflow.train.AdamOptimizer(0.01)
            opt.minimize(m, maxiter=5000)
            self.assertTrue(m.x.read_value().max() < 1e-2)

    def test_lbfgsb(self):
        with self.test_context():
            m = Quadratic()
            opt = gpflow.train.ScipyOptimizer()
            opt.minimize(m, maxiter=1000)
            self.assertTrue(m.x.read_value().max() < 1e-6)


class Empty(gpflow.models.Model):
    def _build_likelihood(self):
        return tf.convert_to_tensor(1., dtype=gpflow.settings.tf_float)


class EmptyTest(GPflowTestCase):
    def test_compile_model_without_parameters(self):
        with self.test_context():
            m = Empty()
            assert_almost_equal(m.compute_log_likelihood(), 1.0)
            assert_almost_equal(m.compute_log_prior(), 0.0)

    def test_parameters_list_empty(self):
        with self.test_context():
            m = Empty(autobuild=False)
            self.assertEqual(list(m.parameters), [])
            self.assertEqual(list(m.trainable_parameters), [])
            self.assertEqual(list(m.params), [])
            m.compile()
            self.assertEqual(list(m.parameters), [])
            self.assertEqual(list(m.trainable_parameters), [])
            self.assertEqual(list(m.params), [])

    def test_objective_tensor(self):
        with self.test_context():
            m = Empty(autobuild=False)
            self.assertEqual(m.objective, None)
            m.build()
            self.assertTrue(gpflow.misc.is_tensor(m.objective))


class ReplaceParameterTest(GPflowTestCase):

    class Origin(gpflow.models.Model):
        def __init__(self):
            super(ReplaceParameterTest.Origin, self).__init__()
            self.a = gpflow.Param(1.)
            self.b = gpflow.Param(2.)

        @gpflow.params_as_tensors
        def _build_likelihood(self):
            return tf.square(self.a) + tf.square(self.b)

    def test_replace_parameter(self):
        class OriginSuccess(ReplaceParameterTest.Origin):
            def __init__(self):
                super(OriginSuccess, self).__init__()
                self.b = gpflow.Param(np.array(3.))

        class OriginAllDataholders(ReplaceParameterTest.Origin):
            def __init__(self):
                super(OriginAllDataholders, self).__init__()
                self.a = gpflow.DataHolder(np.array(2.))
                self.b = gpflow.DataHolder(np.array(2.))

        with self.test_context():
            m0 = self.Origin()
            m0.compile()
            assert_almost_equal(m0.compute_log_likelihood(), 5.0)

            m1 = OriginSuccess()
            m1.compile()
            assert_almost_equal(m1.compute_log_likelihood(), 10.0)

            m2 = OriginAllDataholders()
            m2.compile()
            assert_almost_equal(m2.compute_log_likelihood(), 8.0)


class KeyboardRaiser:
    """
    This wraps a function and makes it raise a KeyboardInterrupt after some number of calls
    """

    def __init__(self, iters_to_raise):
        self.iters_to_raise = iters_to_raise
        self.count = 0

    def __call__(self, *a, **kw):
        self.count += 1
        if self.count >= self.iters_to_raise:
            raise KeyboardInterrupt

def setup_sgpr():
    X = np.random.randn(1000, 3)
    Y = np.random.randn(1000, 3)
    Z = np.random.randn(100, 3)
    return gpflow.models.SGPR(X, Y, Z=Z, kern=gpflow.kernels.RBF(3))

# # TODO(@awav): KeyboardInterrupt is never caught.
# class TestKeyboardCatching(GPflowTestCase):
#     def test_optimize_np(self):
#         with self.test_context():
#             m = setup_sgpr()
#             x_before = m.read_trainables()
#             opt = gpflow.train.ScipyOptimizer()
#             step = 15
#             raiser = KeyboardRaiser(step)
#             opt.minimize(m, step_callback=raiser, maxiter=1000)
#             self.assertEqual(raiser.count, step)
#             x_after = m.read_trainables()
#             before = np.hstack([np.hstack(np.hstack([x])) for x in x_before])
#             after = np.hstack([np.hstack(np.hstack([x])) for x in x_after])
#             self.assertFalse(np.allclose(before, after))

    # TODO(@awav)
    #def test_optimize_tf(self):
    #    with self.test_context():
    #        x0 = self.m.get_free_state()
    #        callback = KeyboardRaiser(5, lambda x: None)
    #        o = tf.train.AdamOptimizer()
    #        self.m.optimize(o, maxiter=10, callback=callback)
    #        x1 = self.m.get_free_state()
    #        self.assertFalse(np.allclose(x0, x1))


class TestLikelihoodAutoflow(GPflowTestCase):
    def test_lik_and_prior(self):
        with self.test_context():
            m = setup_sgpr()
            l0 = m.compute_log_likelihood()
            p0 = m.compute_log_prior()

        m.clear()

        with self.test_context():
            m.kern.variance.prior = gpflow.priors.Gamma(1.4, 1.6)
            m.compile()
            l1 = m.compute_log_likelihood()
            p1 = m.compute_log_prior()

        self.assertEqual(p0, 0.0)
        self.assertNotEqual(p0, p1)
        self.assertEqual(l0, l1)


class TestName(GPflowTestCase):
    def test_name(self):
        with self.test_context():
            m1 = Empty()
            self.assertEqual(m1.name, 'Empty')
            m2 = Empty(name='foo')
            self.assertEqual(m2.name, 'foo')


# class TestNoRecompileThroughNewModelInstance(GPflowTestCase):
#     """ Regression tests for Bug #454 """

#     def setUp(self):
#         self.X = np.random.rand(10, 2)
#         self.Y = np.random.rand(10, 1)

#     def test_gpr(self):
#         with self.test_context():
#             m1 = gpflow.models.GPR(self.X, self.Y, gpflow.kernels.Matern32(2))
#             m1.compile()
#             m2 = gpflow.models.GPR(self.X, self.Y, gpflow.kernels.Matern32(2))
#             self.assertFalse(m1._needs_recompile)

#     def test_sgpr(self):
#         with self.test_context():
#             m1 = gpflow.models.SGPR(self.X, self.Y, gpflow.kernels.Matern32(2), Z=self.X)
#             m1.compile()
#             m2 = gpflow.models.SGPR(self.X, self.Y, gpflow.kernels.Matern32(2), Z=self.X)
#             self.assertFalse(m1._needs_recompile)

#     def test_gpmc(self):
#         with self.test_context():
#             m1 = gpflow.models.GPMC(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT())
#             m1.compile()
#             m2 = gpflow.models.GPMC(
#                     self.X, self.Y,
#                     gpflow.kernels.Matern32(2),
#                     likelihood=gpflow.likelihoods.StudentT())
#             self.assertFalse(m1._needs_recompile)

#     def test_sgpmc(self):
#         with self.test_context():
#             m1 = gpflow.models.SGPMC(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT(),
#                 Z=self.X)
#             m1.compile()
#             m2 = gpflow.models.SGPMC(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT(),
#                 Z=self.X)
#             self.assertFalse(m1._needs_recompile)

#     def test_svgp(self):
#         with self.test_context():
#             m1 = gpflow.models.SVGP(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT(),
#                 Z=self.X)
#             m1.compile()
#             m2 = gpflow.models.SVGP(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT(),
#                 Z=self.X)
#             self.assertFalse(m1._needs_recompile)

#     def test_vgp(self):
#         with self.test_context():
#             m1 = gpflow.models.VGP(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT())
#             m1.compile()
#             m2 = gpflow.models.VGP(
#                 self.X, self.Y,
#                 gpflow.kernels.Matern32(2),
#                 likelihood=gpflow.likelihoods.StudentT())
#             self.assertFalse(m1._needs_recompile)


if __name__ == "__main__":
    tf.test.main()
back to top