Raw File
# Copyright 2016 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings
import tensorflow as tf

import numpy as np
from numpy.testing import assert_allclose, assert_equal


import gpflow
from gpflow.transforms import Chain, Identity
from gpflow.test_util import GPflowTestCase
from gpflow import settings


class TransformTests(GPflowTestCase):
    def prepare(self):
        x_np = np.random.randn(10).astype(settings.float_type)
        transforms = []
        for transform_class in gpflow.transforms.Transform.__subclasses__():
            if transform_class == Chain:
                continue  # Chain transform cannot be tested on its own
            if transform_class == gpflow.transforms.LowerTriangular:
                pass  # Test triangular transforms separately.
            elif transform_class == gpflow.transforms.DiagMatrix:
                transforms.append(transform_class(5))
            else:
                transform = transform_class()
                transforms.append(transform)
                transforms.append(Chain(Identity(), transform))
                transforms.append(Chain(transform, Identity()))

        transforms.append(gpflow.transforms.Logistic(7.3, 19.4))

        # test __call__() and chaining:
        transforms.append(gpflow.transforms.positive(gpflow.transforms.Rescale(7.5)))
        transforms.append(gpflow.transforms.Rescale(9.5)(gpflow.transforms.positive))

        # test helper:
        transforms.append(gpflow.transforms.positiveRescale(9.5))

        return tf.convert_to_tensor(x_np), x_np, transforms

    def test_tf_np_forward_backward(self):
        """
        Make sure the np forward transforms are the same as the tensorflow ones
        """
        with self.test_context() as session:
            x, x_np, transforms = self.prepare()
            for t in transforms:
                y_tf = t.forward_tensor(x)
                y_np = t.forward(x_np)
                assert_allclose(session.run(y_tf), y_np)

    def test_forward_backward(self):
        with self.test_context() as session:
            x, x_np, transforms = self.prepare()
            for t in transforms:
                y_np_res = t.forward(x_np)
                y_tf = t.forward_tensor(x)
                y_tf_res = session.run(y_tf)

                assert_allclose(y_np_res, y_tf_res)

                x_np_res = t.backward(y_np_res)
                x_tf = t.backward_tensor(y_tf)
                x_tf_res = session.run(x_tf)

                assert_allclose(x_np_res, x_tf_res)
                x_expect = x_np.reshape(x_np_res.shape)
                assert_allclose(x_expect, x_np_res)
                assert_allclose(x_expect, x_tf_res)

    def test_logjac(self):
        """
        We have hand-crafted the log-jacobians for speed. Check they're correct
        wrt a tensorflow derived version
        """
        with self.test_context() as session:
            x, x_np, transforms = self.prepare()

            # there is no jacobian: loop manually
            def jacobian(f):
                return tf.stack([tf.gradients(f(x)[i], x)[0] for i in range(10)])

            tf_jacs = [tf.log(tf.matrix_determinant(jacobian(t.forward_tensor)))
                       for t in transforms
                       if not isinstance(t, (gpflow.transforms.LowerTriangular,
                                             gpflow.transforms.DiagMatrix))]
            hand_jacs = [t.log_jacobian_tensor(x)
                         for t in transforms
                         if not isinstance(t, (gpflow.transforms.LowerTriangular,
                                               gpflow.transforms.DiagMatrix))]

            for j1, j2 in zip(tf_jacs, hand_jacs):
                j1_res = session.run(j1)
                j2_res = session.run(j2)
                assert_allclose(j1_res, j2_res)

    def test_logistic_error_wrong_order(self):
        with self.assertRaises(ValueError):
            gpflow.transforms.Logistic(8.0, 4.7)

    def test_logistic_error_bounds_equal(self):
        with self.assertRaises(ValueError):
            gpflow.transforms.Logistic(4.7, 4.7)

    def test_bad_chain_argument(self):
        t = gpflow.transforms.Logistic(1.0, 2.0)
        with self.assertRaises(TypeError):
            t(1.5)  # this syntax chains transforms, is not equivalent to t.forward(x)


class TestChainIdentity(GPflowTestCase):
    def prepare(self):
        x_np = np.random.randn(10).astype(settings.float_type)
        transforms = []
        for transform in gpflow.transforms.Transform.__subclasses__():
            if transform != Chain and transform != gpflow.transforms.LowerTriangular:
                transforms.append(transform())
        transforms.append(gpflow.transforms.Logistic(7.3, 19.4))
        return tf.convert_to_tensor(x_np), x_np, transforms

    def assertEqualElements(self, lst):
        elem0 = lst[0]
        for elemi in lst[1:]:
            assert_equal(elem0, elemi)

    def test_equivalence(self):
        """
        Make sure chaining with identity doesn't lead to different values.
        """
        with self.test_context() as session:
            x, x_np, transforms = self.prepare()
            for transform in transforms:
                equiv_transforms = [transform,
                                    Chain(transform, Identity()),
                                    Chain(Identity(), transform)]

                ys_np = [t.forward(x_np) for t in equiv_transforms]
                self.assertEqualElements(ys_np)

                y_np = ys_np[0]
                xs_np = [t.backward(y_np) for t in equiv_transforms]
                self.assertEqualElements(xs_np)

                ys = [t.forward_tensor(x) for t in equiv_transforms]
                ys_tf = [session.run(y) for y in ys]
                self.assertEqualElements(ys_tf)

                logjs = [t.log_jacobian_tensor(x) for t in equiv_transforms]
                logjs_tf = [session.run(logj) for logj in logjs]
                self.assertEqualElements(logjs_tf)


class TestOverflow(GPflowTestCase):
    """
    Bug #302 identified an overflow in the standard positive transform. This is a regression test.
    """

    def setUp(self):
        self.t = gpflow.transforms.Log1pe()

    def testOverflow(self):
        with warnings.catch_warnings(record=True) as w:
            warnings.simplefilter('always')
            y = self.t.forward(np.array([-1000, -300, -10, 10, 300, 1000]))
            self.assertEqual(len(w), 0)

        self.assertFalse(np.any(np.isinf(y)))
        self.assertFalse(np.any(np.isnan(y)))

    def testDivByZero(self):
        with warnings.catch_warnings(record=True) as w:
            warnings.simplefilter('always')
            y = self.t.backward(np.array([self.t._lower]))
            self.assertTrue(len(w) == 0)

        self.assertFalse(np.any(np.isinf(y)))
        self.assertFalse(np.any(np.isnan(y)))


class TestMatrixTransforms(GPflowTestCase):
    """
    Some extra tests for the matrix transformations.
    """
    def test_LowerTriangular(self):
        t = gpflow.transforms.LowerTriangular(N=3, num_matrices=2)
        t.forward(np.ones((2, 6)))
        with self.assertRaises(ValueError):
            t.forward(np.ones((2, 7)))

    def test_LowerTriangular_squeezes_only_first_axis(self):
        t = gpflow.transforms.LowerTriangular(1, 1, squeeze=True)
        ret = t.forward(np.ones((1, 1)))
        self.assertEqual(ret.shape, (1, 1))

    def test_LowerTriangularConsistency(self):
        t = gpflow.transforms.LowerTriangular(2, 4)
        M = np.random.randn(4, 2, 2) * np.tri(2, 2)[None, :, :]

        M_free = t.backward(M)
        assert_allclose(M, t.forward(M_free))

        with self.test_context() as session:
            M_free_tf = session.run(t.backward_tensor(tf.identity(M)))
            assert_allclose(M_free, M_free_tf)
            assert_allclose(M, session.run(t.forward_tensor(tf.identity(M_free_tf))))

    def test_LowerTriangular_squeezes(self):
        t1 = gpflow.transforms.LowerTriangular(N=3, num_matrices=1)
        t2 = gpflow.transforms.LowerTriangular(N=3, num_matrices=1, squeeze=True)
        X = np.random.randn(1, 6)
        Y = np.random.randn(1, 3, 3)

        self.assertTrue(np.all(t1.forward(X).squeeze() == t2.forward(X)))
        self.assertTrue(np.all(t1.forward(X).squeeze().shape == t2.forward(X).shape))
        self.assertTrue(np.all(t1.backward(Y) == t2.backward(Y.squeeze())))
        self.assertTrue(np.all(t1.backward(Y).shape == t2.backward(Y.squeeze()).shape))

        with self.test_context() as session:
            self.assertTrue(np.all(session.run(tf.squeeze(t1.forward_tensor(X))) == 
                                   session.run(t2.forward_tensor(X))))

            self.assertTrue(np.all(session.run(t1.backward_tensor(Y)) == 
                                   session.run(t2.backward_tensor(Y.squeeze()))))

        # make sure we don't try to squeeze when there are more than 1 matrices.
        with self.assertRaises(ValueError):
            gpflow.transforms.LowerTriangular(N=3, num_matrices=2, squeeze=True)


    def test_DiagMatrix(self):
        t = gpflow.transforms.DiagMatrix(3)
        t.backward(np.eye(3))
        t.backward(np.eye(3)[None, :, :])
        t.backward(np.eye(3)[None, :, :] * np.array([1, 2])[:, None, None])
        with self.assertRaises(ValueError):
            t.backward(np.eye(4))
            t.backward(np.eye(2)[None, :, :] * np.array([1, 2, 3])[:, None, None])


class TestDiagMatrixTransform(GPflowTestCase):
    def setUp(self):
        self.t1 = gpflow.transforms.DiagMatrix(dim=1)
        self.t2 = gpflow.transforms.DiagMatrix(dim=3)

    def test_forward_backward(self):
        free_1d = np.random.randn(8)
        fwd1d = self.t1.forward(free_1d)
        assert_allclose(fwd1d.shape, np.array([len(free_1d), self.t1.dim, self.t1.dim]))
        assert_allclose(free_1d, self.t1.backward(fwd1d))

        size2d = 5
        free_2d = np.random.randn(size2d, self.t2.dim).flatten()
        fwd2d = self.t2.forward(free_2d)
        assert_allclose(fwd2d.shape, np.array([size2d, self.t2.dim, self.t2.dim]))
        assert_allclose(free_2d, self.t2.backward(fwd2d))

    def test_tf_np_forward(self):
        """
        Make sure the np forward transforms are the same as the tensorflow ones
        """
        with self.test_context() as session:
            free = np.random.randn(8, self.t2.dim)
            x = tf.convert_to_tensor(free)
            ys = session.run(self.t2.forward_tensor(x))
            assert_allclose(ys, self.t2.forward(free))

            free = np.random.randn(7, self.t1.dim)
            x = tf.convert_to_tensor(free)
            ys = session.run(self.t1.forward_tensor(x))
            assert_allclose(ys, self.t1.forward(free))


if __name__ == "__main__":
    tf.test.main()
back to top