# Copyright 2017 the GPflow authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import numpy as np import pytest import tensorflow as tf from numpy.testing import assert_allclose import gpflow from gpflow.inducing_variables import InducingPoints from gpflow.likelihoods import (Bernoulli, Beta, Exponential, Gamma, Gaussian, GaussianMC, Likelihood, MonteCarloLikelihood, MultiClass, Ordinal, Poisson, RobustMax, Softmax, StudentT, SwitchedLikelihood) from gpflow.quadrature import ndiagquad from gpflow.config import default_float, default_int tf.random.set_seed(99012) def filter_analytic(likelihood_setups, method_name): def is_analytic(likelihood): assert not isinstance(likelihood, gpflow.likelihoods.MonteCarloLikelihood) quadrature_fallback = getattr(Likelihood, method_name) actual_method = getattr(likelihood.__class__, method_name) return actual_method is not quadrature_fallback return [l for l in likelihood_setups if is_analytic(get_likelihood(l))] class Datum: tolerance = 1e-06 Yshape = (10, 3) Y = tf.random.normal(Yshape, dtype=tf.float64) F = tf.random.normal(Yshape, dtype=tf.float64) Fmu = tf.random.normal(Yshape, dtype=tf.float64) Fvar = 0.01 * tf.random.normal(Yshape, dtype=tf.float64)**2 Fvar_zero = tf.zeros(Yshape, dtype=tf.float64) class LikelihoodSetup(object): def __init__(self, likelihood, Y=Datum.Y, rtol=1e-06, atol=0.): self.likelihood = likelihood self.Y = Y self.rtol = rtol self.atol = atol def __repr__(self): name = self.likelihood.__class__.__name__ return f"{name}-rtol={self.rtol}-atol={self.atol}" likelihood_setups = [ LikelihoodSetup(Gaussian()), LikelihoodSetup(StudentT()), LikelihoodSetup(Beta(), Y=tf.random.uniform(Datum.Yshape, dtype=default_float())), LikelihoodSetup(Ordinal(np.array([-1, 1])), Y=tf.random.uniform(Datum.Yshape, 0, 3, dtype=default_int())), LikelihoodSetup(Poisson(invlink=tf.square), Y=tf.random.poisson(Datum.Yshape, 1.0, dtype=default_float())), LikelihoodSetup(Exponential(invlink=tf.square), Y=tf.random.uniform(Datum.Yshape, dtype=default_float())), LikelihoodSetup(Gamma(invlink=tf.square), Y=tf.random.uniform(Datum.Yshape, dtype=default_float())), LikelihoodSetup(Bernoulli(invlink=tf.sigmoid), Y=tf.random.uniform(Datum.Yshape, dtype=default_float())), LikelihoodSetup(MultiClass(3), Y=tf.argmax(Datum.Y, 1).numpy().reshape(-1, 1), rtol=1e-3, atol=1e-3), ] def get_likelihood(likelihood_setup): if isinstance(likelihood_setup, type(pytest.param())): likelihood_setup, = likelihood_setup.values return likelihood_setup.likelihood def test_no_missing_likelihoods(): all_likelihood_types = Likelihood.__subclasses__() tested_likelihood_types = [get_likelihood(l).__class__ for l in likelihood_setups] for likelihood_class in all_likelihood_types: if likelihood_class in tested_likelihood_types: continue # already tested if likelihood_class is SwitchedLikelihood: continue # tested separately if likelihood_class is MonteCarloLikelihood: continue # abstract base class if issubclass(likelihood_class, MonteCarloLikelihood): continue # TODO assert False, f"no test for likelihood class {likelihood_class}" @pytest.mark.parametrize('likelihood_setup', likelihood_setups) @pytest.mark.parametrize('mu, var', [[Datum.Fmu, tf.zeros_like(Datum.Fmu)]]) def test_conditional_mean_and_variance(likelihood_setup, mu, var): """ Here we make sure that the conditional_mean and conditional_var functions give the same result as the predict_mean_and_var function if the prediction has no uncertainty. """ mu1 = likelihood_setup.likelihood.conditional_mean(mu) var1 = likelihood_setup.likelihood.conditional_variance(mu) mu2, var2 = likelihood_setup.likelihood.predict_mean_and_var(mu, var) assert_allclose(mu1, mu2, rtol=likelihood_setup.rtol, atol=likelihood_setup.atol) assert_allclose(var1, var2, rtol=likelihood_setup.rtol, atol=likelihood_setup.atol) @pytest.mark.parametrize('likelihood_setup', likelihood_setups) def test_variational_expectations(likelihood_setup): """ Here we make sure that the variational_expectations gives the same result as log_prob if the latent function has no uncertainty. """ likelihood = likelihood_setup.likelihood F = Datum.F Y = likelihood_setup.Y r1 = likelihood.log_prob(F, Y) r2 = likelihood.variational_expectations(F, tf.zeros_like(F), Y) assert_allclose(r1, r2, atol=likelihood_setup.atol, rtol=likelihood_setup.rtol) @pytest.mark.parametrize('likelihood_setup', filter_analytic(likelihood_setups, "variational_expectations")) @pytest.mark.parametrize('mu, var', [[Datum.Fmu, Datum.Fvar]]) def test_quadrature_variational_expectation(likelihood_setup, mu, var): """ Where quadrature methods have been overwritten, make sure the new code does something close to the quadrature. """ likelihood, y = likelihood_setup.likelihood, likelihood_setup.Y if isinstance(likelihood, MultiClass): pytest.skip("Test fails due to issue with ndiagquad (github issue #1091)") F1 = likelihood.variational_expectations(mu, var, y) F2 = ndiagquad(likelihood.log_prob, likelihood.num_gauss_hermite_points, mu, var, Y=y) assert_allclose(F1, F2, rtol=likelihood_setup.rtol, atol=likelihood_setup.atol) @pytest.mark.parametrize('likelihood_setup', filter_analytic(likelihood_setups, "predict_density")) @pytest.mark.parametrize('mu, var', [[Datum.Fmu, Datum.Fvar]]) def test_quadrature_predict_density(likelihood_setup, mu, var): likelihood, y = likelihood_setup.likelihood, likelihood_setup.Y if isinstance(likelihood, MultiClass): pytest.skip("Test fails due to issue with ndiagquad (github issue #1091)") F1 = likelihood.predict_density(mu, var, y) F2 = Likelihood.predict_density(likelihood, mu, var, y) assert_allclose(F1, F2, rtol=likelihood_setup.rtol, atol=likelihood_setup.atol) @pytest.mark.parametrize('likelihood_setup', filter_analytic(likelihood_setups, "predict_mean_and_var")) @pytest.mark.parametrize('mu, var', [[Datum.Fmu, Datum.Fvar]]) def test_quadrature_mean_and_var(likelihood_setup, mu, var): likelihood = likelihood_setup.likelihood if isinstance(likelihood, MultiClass): pytest.skip("Test fails due to issue with ndiagquad (github issue #1091)") F1m, F1v = likelihood.predict_mean_and_var(mu, var) F2m, F2v = Likelihood.predict_mean_and_var(likelihood, mu, var) assert_allclose(F1m, F2m, rtol=likelihood_setup.rtol, atol=likelihood_setup.atol) assert_allclose(F1v, F2v, rtol=likelihood_setup.rtol, atol=likelihood_setup.atol) def _make_montecarlo_mu_var_y(): mu_var_y = [tf.random.normal((3, 10), dtype=tf.float64)] * 3 mu_var_y[1] = 0.01 * (mu_var_y[1]**2) return mu_var_y def _make_montecarlo_likelihoods(var): gaussian_mc_likelihood = GaussianMC(var) gaussian_mc_likelihood.num_monte_carlo_points = 1000000 return gaussian_mc_likelihood, Gaussian(var) @pytest.mark.parametrize('likelihood_var', [0.3, 0.5, 1]) @pytest.mark.parametrize('mu, var, y', [_make_montecarlo_mu_var_y()]) def test_montecarlo_variational_expectation(likelihood_var, mu, var, y): likelihood_gaussian_mc, likelihood_gaussian = _make_montecarlo_likelihoods(likelihood_var) assert_allclose(likelihood_gaussian_mc.variational_expectations(mu, var, y), likelihood_gaussian.variational_expectations(mu, var, y), rtol=5e-4, atol=1e-4) @pytest.mark.parametrize('likelihood_var', [0.3, 0.5, 1.]) @pytest.mark.parametrize('mu, var, y', [_make_montecarlo_mu_var_y()]) def test_montecarlo_predict_density(likelihood_var, mu, var, y): likelihood_gaussian_mc, likelihood_gaussian = _make_montecarlo_likelihoods(likelihood_var) assert_allclose(likelihood_gaussian_mc.predict_density(mu, var, y), likelihood_gaussian.predict_density(mu, var, y), rtol=5e-4, atol=1e-4) @pytest.mark.parametrize('likelihood_var', [0.3, 0.5, 1.]) @pytest.mark.parametrize('mu, var, y', [_make_montecarlo_mu_var_y()]) def test_montecarlo_predict_mean_and_var(likelihood_var, mu, var, y): likelihood_gaussian_mc, likelihood_gaussian = _make_montecarlo_likelihoods(likelihood_var) mean1, var1 = likelihood_gaussian_mc.predict_mean_and_var(mu, var) mean2, var2 = likelihood_gaussian.predict_mean_and_var(mu, var) assert_allclose(mean1, mean2, rtol=5e-4, atol=1e-4) assert_allclose(var1, var2, rtol=5e-4, atol=1e-4) @pytest.mark.parametrize('num, dimF', [[10, 5], [3, 2]]) @pytest.mark.parametrize('dimY', [10, 2, 1]) def test_softmax_y_shape_assert(num, dimF, dimY): """ SoftMax assumes the class is given as a label (not, e.g., one-hot encoded), and hence just uses the first column of Y. To prevent silent errors, there is a tf assertion that ensures Y only has one dimension. This test checks that this assert works as intended. """ F = tf.random.normal((num, dimF)) dY = np.vstack((np.random.randn(num - 3, dimY), np.ones((3, dimY)))) > 0 Y = tf.convert_to_tensor(dY, dtype=default_int()) likelihood = Softmax(dimF) try: likelihood.log_prob(F, Y) except tf.errors.InvalidArgumentError as e: assert "Condition x == y did not hold." in e.message @pytest.mark.parametrize('num', [10, 3]) @pytest.mark.parametrize('dimF, dimY', [[2, 1]]) def test_softmax_bernoulli_equivalence(num, dimF, dimY): dF = np.vstack((np.random.randn(num - 3, dimF), np.array([[-3., 0.], [3, 0.], [0., 0.]]))) dY = np.vstack((np.random.randn(num - 3, dimY), np.ones((3, dimY)))) > 0 F = tf.cast(dF, default_float()) Fvar = tf.exp(tf.stack([F[:, 1], -10.0 + tf.zeros(F.shape[0], dtype=F.dtype)], axis=1)) F = tf.stack([F[:, 0], tf.zeros(F.shape[0], dtype=F.dtype)], axis=1) Y = tf.cast(dY, default_int()) Ylabel = 1 - Y softmax_likelihood = Softmax(dimF) bernoulli_likelihood = Bernoulli(invlink=tf.sigmoid) softmax_likelihood.num_monte_carlo_points = int(0.3e7) # Minimum number of points to pass the test on CircleCI bernoulli_likelihood.num_gauss_hermite_points = 40 assert_allclose(softmax_likelihood.conditional_mean(F)[:, :1], bernoulli_likelihood.conditional_mean(F[:, :1])) assert_allclose( softmax_likelihood.conditional_variance(F)[:, :1], bernoulli_likelihood.conditional_variance(F[:, :1])) assert_allclose(softmax_likelihood.log_prob(F, Ylabel), bernoulli_likelihood.log_prob(F[:, :1], Y.numpy())) mean1, var1 = softmax_likelihood.predict_mean_and_var(F, Fvar) mean2, var2 = bernoulli_likelihood.predict_mean_and_var(F[:, :1], Fvar[:, :1]) assert_allclose(mean1[:, 0, None], mean2, rtol=2e-3) assert_allclose(var1[:, 0, None], var2, rtol=2e-3) ls_ve = softmax_likelihood.variational_expectations(F, Fvar, Ylabel) lb_ve = bernoulli_likelihood.variational_expectations(F[:, :1], Fvar[:, :1], Y.numpy()) assert_allclose(ls_ve[:, 0, None], lb_ve, rtol=5e-3) @pytest.mark.parametrize('num_classes, num_points', [[10, 3]]) @pytest.mark.parametrize('tol, epsilon', [[1e-4, 1e-3]]) def test_robust_max_multiclass_symmetric(num_classes, num_points, tol, epsilon): """ This test is based on the observation that for symmetric inputs the class predictions must have equal probability. """ rng = np.random.RandomState(1) p = 1. / num_classes F = tf.ones((num_points, num_classes), dtype=default_float()) Y = tf.convert_to_tensor(rng.randint(num_classes, size=(num_points, 1)), dtype=default_float()) likelihood = MultiClass(num_classes) likelihood.invlink.epsilon = tf.convert_to_tensor(epsilon, dtype=default_float()) mu, _ = likelihood.predict_mean_and_var(F, F) pred = likelihood.predict_density(F, F, Y) variational_expectations = likelihood.variational_expectations(F, F, Y) expected_mu = (p * (1. - epsilon) + (1. - p) * epsilon / (num_classes - 1)) * np.ones((num_points, 1)) expected_log_density = np.log(expected_mu) # assert_allclose() would complain about shape mismatch assert (np.allclose(mu, expected_mu, tol, tol)) assert (np.allclose(pred, expected_log_density, 1e-3, 1e-3)) validation_variational_expectation = (p * np.log(1. - epsilon) + (1. - p) * np.log(epsilon / (num_classes - 1))) assert_allclose(variational_expectations, np.ones((num_points, 1)) * validation_variational_expectation, tol, tol) @pytest.mark.parametrize('num_classes, num_points', [[5, 100]]) @pytest.mark.parametrize( 'mock_prob, expected_prediction, tol, epsilon', [[0.73, -0.5499780059, 1e-4, 0.231] # Expected prediction evaluated on calculator: # log((1 - ε) * 0.73 + (1-0.73) * ε / (num_classes -1)) ]) def test_robust_max_multiclass_predict_density(num_classes, num_points, mock_prob, expected_prediction, tol, epsilon): class MockRobustMax(gpflow.likelihoods.RobustMax): def prob_is_largest(self, Y, Fmu, Fvar, gh_x, gh_w): return tf.ones((num_points, 1), dtype=default_float()) * mock_prob likelihood = MultiClass(num_classes, invlink=MockRobustMax(num_classes, epsilon)) F = tf.ones((num_points, num_classes)) rng = np.random.RandomState(1) Y = tf.cast(rng.randint(num_classes, size=(num_points, 1)), dtype=default_int()) prediction = likelihood.predict_density(F, F, Y) assert_allclose(prediction, expected_prediction, tol, tol) @pytest.mark.parametrize('num_classes', [5, 100]) @pytest.mark.parametrize('initial_epsilon, new_epsilon', [[1e-3, 0.412]]) def test_robust_max_multiclass_eps_k1_changes(num_classes, initial_epsilon, new_epsilon): """ Checks that eps K1 changes when epsilon changes. This used to not happen and had to be manually changed. """ likelihood = RobustMax(num_classes, initial_epsilon) expected_eps_k1 = initial_epsilon / (num_classes - 1.) actual_eps_k1 = likelihood.eps_k1 assert_allclose(expected_eps_k1, actual_eps_k1) likelihood.epsilon = tf.convert_to_tensor(new_epsilon, dtype=default_float()) expected_eps_k2 = new_epsilon / (num_classes - 1.) actual_eps_k2 = likelihood.eps_k1 assert_allclose(expected_eps_k2, actual_eps_k2) @pytest.mark.parametrize('Y_list', [[tf.random.normal((i, 2)) for i in range(3, 6)]]) @pytest.mark.parametrize('F_list', [[tf.random.normal((i, 2)) for i in range(3, 6)]]) @pytest.mark.parametrize('Fvar_list', [[tf.exp(tf.random.normal((i, 2))) for i in range(3, 6)]]) @pytest.mark.parametrize('Y_label', [[tf.ones((i, 2)) * (i - 3.) for i in range(3, 6)]]) def test_switched_likelihood_log_prob(Y_list, F_list, Fvar_list, Y_label): """ SwitchedLikelihood is separately tested here. Here, we make sure the partition-stitch works fine. """ Y_perm = list(range(3 + 4 + 5)) np.random.shuffle(Y_perm) # shuffle the original data Y_sw = np.hstack([np.concatenate(Y_list), np.concatenate(Y_label)])[Y_perm, :3] F_sw = np.concatenate(F_list)[Y_perm, :] likelihoods = [Gaussian()] * 3 for lik in likelihoods: lik.variance = np.exp(np.random.randn(1)).squeeze().astype(np.float32) switched_likelihood = SwitchedLikelihood(likelihoods) switched_results = switched_likelihood.log_prob(F_sw, Y_sw) results = [lik.log_prob(f, y) for lik, y, f in zip(likelihoods, Y_list, F_list)] assert_allclose(switched_results, np.concatenate(results)[Y_perm, :]) @pytest.mark.parametrize('Y_list', [[tf.random.normal((i, 2)) for i in range(3, 6)]]) @pytest.mark.parametrize('F_list', [[tf.random.normal((i, 2)) for i in range(3, 6)]]) @pytest.mark.parametrize('Fvar_list', [[tf.exp(tf.random.normal((i, 2))) for i in range(3, 6)]]) @pytest.mark.parametrize('Y_label', [[tf.ones((i, 2)) * (i - 3.) for i in range(3, 6)]]) def test_switched_likelihood_predict_density(Y_list, F_list, Fvar_list, Y_label): Y_perm = list(range(3 + 4 + 5)) np.random.shuffle(Y_perm) # shuffle the original data Y_sw = np.hstack([np.concatenate(Y_list), np.concatenate(Y_label)])[Y_perm, :3] F_sw = np.concatenate(F_list)[Y_perm, :] Fvar_sw = np.concatenate(Fvar_list)[Y_perm, :] likelihoods = [Gaussian()] * 3 for lik in likelihoods: lik.variance = np.exp(np.random.randn(1)).squeeze().astype(np.float32) switched_likelihood = SwitchedLikelihood(likelihoods) switched_results = switched_likelihood.predict_density(F_sw, Fvar_sw, Y_sw) # likelihood results = [lik.predict_density(f, fvar, y) for lik, y, f, fvar in zip(likelihoods, Y_list, F_list, Fvar_list)] assert_allclose(switched_results, np.concatenate(results)[Y_perm, :]) @pytest.mark.parametrize('Y_list', [[tf.random.normal((i, 2)) for i in range(3, 6)]]) @pytest.mark.parametrize('F_list', [[tf.random.normal((i, 2)) for i in range(3, 6)]]) @pytest.mark.parametrize('Fvar_list', [[tf.exp(tf.random.normal((i, 2))) for i in range(3, 6)]]) @pytest.mark.parametrize('Y_label', [[tf.ones((i, 2)) * (i - 3.) for i in range(3, 6)]]) def test__switched_likelihood_variational_expectations(Y_list, F_list, Fvar_list, Y_label): Y_perm = list(range(3 + 4 + 5)) np.random.shuffle(Y_perm) # shuffle the original data Y_sw = np.hstack([np.concatenate(Y_list), np.concatenate(Y_label)])[Y_perm, :3] F_sw = np.concatenate(F_list)[Y_perm, :] Fvar_sw = np.concatenate(Fvar_list)[Y_perm, :] likelihoods = [Gaussian()] * 3 for lik in likelihoods: lik.variance = np.exp(np.random.randn(1)).squeeze().astype(np.float32) switched_likelihood = SwitchedLikelihood(likelihoods) switched_results = switched_likelihood.variational_expectations(F_sw, Fvar_sw, Y_sw) results = [ lik.variational_expectations(f, fvar, y) for lik, y, f, fvar in zip(likelihoods, Y_list, F_list, Fvar_list) ] assert_allclose(switched_results, np.concatenate(results)[Y_perm, :]) @pytest.mark.parametrize('num_latent', [1, 2]) def test_switched_likelihood_regression_valid_num_latent(num_latent): """ A Regression test when using Switched likelihood: the number of latent functions in a GP model must be equal to the number of columns in Y minus one. The final column of Y is used to index the switch. If the number of latent functions does not match, an exception will be raised. """ x = np.random.randn(100, 1) y = np.hstack((np.random.randn(100, 1), np.random.randint(0, 3, (100, 1)))) data = x, y Z = InducingPoints(np.random.randn(num_latent, 1)) likelihoods = [StudentT()] * 3 switched_likelihood = SwitchedLikelihood(likelihoods) m = gpflow.models.SVGP(kernel=gpflow.kernels.Matern12(), inducing_variable=Z, likelihood=switched_likelihood, num_latent=num_latent) if num_latent == 1: m.log_likelihood(data) else: with pytest.raises(tf.errors.InvalidArgumentError): m.log_likelihood(data)