https://github.com/GPflow/GPflow
Raw File
Tip revision: ff05f7e8ae2fc51529f05f4f7a849ea545d4f8cf authored by Felix Leibfried on 21 January 2020, 13:41:47 UTC
minor change to optimizer
Tip revision: ff05f7e
test_conditionals_broadcasting.py
# Copyright 2017 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
This test suite will check if the conditionals broadcast correctly
when the input tensors have leading dimensions.
"""


import numpy as np
import pytest
import tensorflow as tf
from numpy.testing import assert_allclose

import gpflow
import gpflow.multioutput.features as mf
import gpflow.multioutput.kernels as mk
from gpflow.conditionals import conditional, sample_conditional, _rollaxis_left, _rollaxis_right
from gpflow.multioutput.conditionals import _mix_latent_gp
from gpflow.test_util import session_tf


class Data:
    S1, S2, N, M = 7, 6, 4, 3
    Dx, Dy, L = 2, 5, 4  # input dim, output dim, observation dimensionality i.e. num latent GPs
    W = np.random.randn(Dy, L)  # mixing matrix

    SX = np.random.randn(S1*S2, N, Dx)
    S1_S2_X = np.reshape(SX, [S1, S2, N, Dx])

    Z = np.random.randn(M, Dx)


@pytest.mark.parametrize("full_cov", [False, True])
@pytest.mark.parametrize("white", [True, False])
@pytest.mark.parametrize("conditional_type", ["mixing", "Z", "inducing_points"])
def test_conditional_broadcasting(session_tf, full_cov, white, conditional_type):
    """
    Test that the `conditional` and `sample_conditional` broadcasts correctly
    over leading dimensions of Xnew. Xnew can be shape [..., N, D],
    and conditional should broadcast over the [...].
    """
    X_ = tf.placeholder(tf.float64, [None, None])
    q_mu = np.random.randn(Data.M, Data.Dy)
    q_sqrt = np.tril(np.random.randn(Data.Dy, Data.M, Data.M), -1)

    if conditional_type == "Z":
        feat = Data.Z
        kern = gpflow.kernels.Matern52(Data.Dx, lengthscales=0.5)
    elif conditional_type == "inducing_points":
        feat = gpflow.features.InducingPoints(Data.Z)
        kern = gpflow.kernels.Matern52(Data.Dx, lengthscales=0.5)
    elif conditional_type == "mixing":
        # variational params have different output dim in this case
        q_mu = np.random.randn(Data.M, Data.L)
        q_sqrt = np.tril(np.random.randn(Data.L, Data.M, Data.M), -1)
        feat = mf.MixedKernelSharedMof(gpflow.features.InducingPoints(Data.Z))
        kern = mk.SeparateMixedMok(
            kernels=[gpflow.kernels.Matern52(Data.Dx, lengthscales=0.5) for _ in range(Data.L)],
            W=Data.W
        )

    if conditional_type == "mixing" and full_cov:
        pytest.skip("combination is not implemented")

    num_samples = 5
    sample_tf, mean_tf, cov_tf = sample_conditional(
        X_,
        feat,
        kern,
        tf.convert_to_tensor(q_mu),
        q_sqrt=tf.convert_to_tensor(q_sqrt),
        white=white,
        full_cov=full_cov,
        num_samples=num_samples
    )

    ss, ms, vs = [], [], []
    for X in Data.SX:
        s, m, v = session_tf.run([sample_tf, mean_tf, cov_tf], {X_: X})
        ms.append(m)
        vs.append(v)
        ss.append(s)

    ms = np.array(ms)
    vs = np.array(vs)
    ss = np.array(ss)

    ss_S12, ms_S12, vs_S12 = session_tf.run(
        sample_conditional(
            Data.SX,
            feat,
            kern,
            tf.convert_to_tensor(q_mu),
            q_sqrt=tf.convert_to_tensor(q_sqrt),
            white=white,
            full_cov=full_cov,
            num_samples=num_samples
        )
    )

    ss_S1_S2, ms_S1_S2, vs_S1_S2 = session_tf.run(
        sample_conditional(
            Data.S1_S2_X,
            feat,
            kern,
            tf.convert_to_tensor(q_mu),
            q_sqrt=tf.convert_to_tensor(q_sqrt),
            white=white,
            full_cov=full_cov,
            num_samples=num_samples
        )
    )

    assert_allclose(ss_S12.shape, ss.shape)
    assert_allclose(ms_S12, ms)
    assert_allclose(vs_S12, vs)
    assert_allclose(ms_S1_S2.reshape(Data.S1 * Data.S2, Data.N, Data.Dy), ms)
    assert_allclose(ss_S1_S2.shape, [Data.S1, Data.S2, num_samples, Data.N, Data.Dy])

    if full_cov:
        assert_allclose(vs_S1_S2.reshape(Data.S1 * Data.S2, Data.Dy, Data.N, Data.N), vs)
    else:
        assert_allclose(vs_S1_S2.reshape(Data.S1 * Data.S2, Data.N, Data.Dy), vs)


# -------------------------------------------
# Test utility functions used in conditionals
# -------------------------------------------

# _mix_latent_gps
@pytest.mark.parametrize("full_cov", [True, False])
@pytest.mark.parametrize("full_output_cov", [True, False])
def test_broadcasting_mix_latent_gps(session_tf, full_cov, full_output_cov):
    S, N = 7, 20  # batch size, num data points
    P, L = 10, 5  # observation dimensionality, num latent GPs
    W = np.random.randn(P, L)  # mixing matrix
    g_mu = np.random.randn(S, N, L)  # mean of the L latent GPs

    g_sqrt_diag = np.tril(np.random.randn(S*L, N, N), -1)  # [L*S, N, N]
    g_sqrt_diag = np.reshape(g_sqrt_diag, [L, S, N, N])
    g_var_diag = g_sqrt_diag @ np.transpose(g_sqrt_diag, [0, 1, 3, 2])  # [L, S, N, N]
    g_var = np.zeros([S, N, L, N, L])
    for l in range(L):
        g_var[:, :, l, :, l] = g_var_diag[l, :, :, :]  # replace diagonal elements by g_var_diag

    # reference numpy implementation for mean
    f_mu_ref = g_mu @ W.T  # [S, N, P]

    # reference numpy implementation for variance
    g_var_tmp = np.transpose(g_var, [0, 1, 3, 2, 4])  # [S, N, N, L, L]
    f_var_ref = W @ g_var_tmp @ W.T  # [S, N, N, P, P]
    f_var_ref = np.transpose(f_var_ref, [0, 1, 3, 2, 4])  # [S, N, P, N, P]

    if not full_cov:
        g_var_diag = np.array([g_var_diag[:, :, n, n] for n in range(N)])  # [N, L, S]
        g_var_diag = np.transpose(g_var_diag, [2, 0, 1])  # [S, N, L]

    # run gpflow's implementation
    f_mu, f_var = session_tf.run(_mix_latent_gp(
        tf.convert_to_tensor(W),
        tf.convert_to_tensor(g_mu),
        tf.convert_to_tensor(g_var_diag),
        full_cov,
        full_output_cov
    ))

    # we strip down f_var_ref to the elements we need
    if not full_output_cov and not full_cov:
        f_var_ref = np.array([f_var_ref[:, :, p, :, p] for p in range(P)])  # [P, S, N, N]
        f_var_ref = np.array([f_var_ref[:, :, n, n] for n in range(N)])  # [N, P, S]
        f_var_ref = np.transpose(f_var_ref, [2, 0, 1])  # [S, N, P]

    elif not full_output_cov and full_cov:
        f_var_ref = np.array([f_var_ref[:, :, p, :, p] for p in range(P)])  # [P, S, N, N]
        f_var_ref = np.transpose(f_var_ref, [1, 0, 2, 3])  # [S, P, N, N]

    elif full_output_cov and not full_cov:
        f_var_ref = np.array([f_var_ref[:, n, :, n, :] for n in range(N)])  # [N, S, P, P]
        f_var_ref = np.transpose(f_var_ref, [1, 0, 2, 3])  # [S, N, P, P]

    else:
        pass  # f_var_ref has shape [..., N, P, N, P] as expected

    # check equality for mean and variance of f
    assert_allclose(f_mu_ref, f_mu)
    assert_allclose(f_var_ref, f_var)


# rollaxis
@pytest.mark.parametrize("rolls", [1, 2])
@pytest.mark.parametrize("direction", ["left", "right"])
def test_rollaxis(session_tf, rolls, direction):
    A = np.random.randn(10, 5, 3)
    A_tf = tf.convert_to_tensor(A)

    if direction == "left":
        perm = [1, 2, 0] if rolls == 1 else [2, 0, 1]
    elif direction == "right":
        perm = [2, 0, 1] if rolls == 1 else [1, 2, 0]

    A_rolled_ref = np.transpose(A, perm)

    if direction == "left":
        A_rolled_tf = _rollaxis_left(A_tf, rolls)
    elif direction == "right":
        A_rolled_tf = _rollaxis_right(A_tf, rolls)

    A_rolled_tf = session_tf.run(A_rolled_tf)
    assert_allclose(A_rolled_ref, A_rolled_tf)


@pytest.mark.parametrize("rolls", [1, 2])
def test_rollaxis_idempotent(session_tf, rolls):
    A = np.random.randn(10, 5, 3, 20, 1)
    A_tf = tf.convert_to_tensor(A)
    A_left_right = session_tf.run(_rollaxis_left(_rollaxis_right(A_tf, 2), 2))
    A_right_left = session_tf.run(_rollaxis_right(_rollaxis_left(A_tf, 2), 2))

    assert_allclose(A, A_left_right)
    assert_allclose(A, A_right_left)
back to top