Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision 1db48f3a735eb0fba06a7d503f080a7ead512604 authored by Artem Artemev on 11 July 2018, 12:50:44 UTC, committed by GitHub on 11 July 2018, 12:50:44 UTC
Update version.py file to 1.2.0 (#812)
1 parent 707b195
  • Files
  • Changes
  • 2109064
  • /
  • tests
  • /
  • test_expectations.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • directory
  • content
revision badge
swh:1:rev:1db48f3a735eb0fba06a7d503f080a7ead512604
directory badge
swh:1:dir:159581b27232f9d682bced21f4ae14b62cdb6e1b
content badge
swh:1:cnt:d3a1aa6ab55d3f742d02da249fbcda10bf7e1d3e

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • revision
  • directory
  • content
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_expectations.py
# Copyright 2018 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import tensorflow as tf
import copy

import pytest

import gpflow
from gpflow.expectations import expectation, quadrature_expectation
from gpflow.probability_distributions import Gaussian, DiagonalGaussian, MarkovGaussian
from gpflow import kernels, mean_functions, features
from gpflow.test_util import session_tf
from gpflow.test_util import cache_tensor

from numpy.testing import assert_allclose

rng = np.random.RandomState(1)
RTOL = 1e-6


class Data:
    num_data = 5
    num_ind = 4
    D_in = 2
    D_out = 2

    Xmu = rng.randn(num_data, D_in)
    Xmu_markov = rng.randn(num_data + 1, D_in)  # (N+1)xD
    Xcov = rng.randn(num_data, D_in, D_in)
    Xcov = Xcov @ np.transpose(Xcov, (0, 2, 1))
    Z = rng.randn(num_ind, D_in)


@pytest.fixture
def feature(session_tf):
    return features.InducingPoints(Data.Z)


@cache_tensor
def gauss():
    return Gaussian(
        tf.convert_to_tensor(Data.Xmu),
        tf.convert_to_tensor(Data.Xcov))


@cache_tensor
def dirac_gauss():
    return Gaussian(
        tf.convert_to_tensor(Data.Xmu),
        tf.convert_to_tensor(np.zeros((Data.num_data, Data.D_in, Data.D_in))))


@cache_tensor
def gauss_diag():
    return DiagonalGaussian(
        tf.convert_to_tensor(Data.Xmu),
        tf.convert_to_tensor(rng.rand(Data.num_data, Data.D_in)))


@cache_tensor
def dirac_diag():
    return DiagonalGaussian(
        tf.convert_to_tensor(Data.Xmu),
        tf.convert_to_tensor(np.zeros((Data.num_data, Data.D_in))))


@cache_tensor
def markov_gauss():
    D_in = Data.D_in
    cov_params = rng.randn(Data.num_data + 1, D_in, 2 * D_in) / 2.  # (N+1)xDx2D
    Xcov = cov_params @ np.transpose(cov_params, (0, 2, 1))  # (N+1)xDxD
    Xcross = cov_params[:-1] @ np.transpose(cov_params[1:], (0, 2, 1))  # NxDxD
    Xcross = np.concatenate((Xcross, np.zeros((1, D_in, D_in))), 0)  # (N+1)xDxD
    Xcov = np.stack([Xcov, Xcross])  # 2x(N+1)xDxD
    return MarkovGaussian(
        tf.convert_to_tensor(Data.Xmu_markov),
        tf.convert_to_tensor(Xcov))


@cache_tensor
def dirac_markov_gauss():
    return MarkovGaussian(
        tf.convert_to_tensor(Data.Xmu_markov),
        tf.convert_to_tensor(np.zeros((2, Data.num_data + 1, Data.D_in, Data.D_in))))


@cache_tensor
def rbf_kern():
    return kernels.RBF(Data.D_in, variance=rng.rand(), lengthscales=rng.rand() + 1.)


@cache_tensor
def lin_kern():
    return kernels.Linear(Data.D_in, variance=rng.rand())


@cache_tensor
def matern_kern():
    return kernels.Matern32(Data.D_in, variance=rng.rand())


@cache_tensor
def rbf_lin_sum_kern():
    return kernels.Sum([
        kernels.RBF(Data.D_in, variance=rng.rand(), lengthscales=rng.rand() + 1.),
        kernels.Linear(Data.D_in, variance=rng.rand())
    ])


@cache_tensor
def rbf_lin_sum_kern2():
    return kernels.Sum([
        kernels.Linear(Data.D_in, variance=rng.rand()),
        kernels.RBF(Data.D_in, variance=rng.rand(), lengthscales=rng.rand() + 1.),
        kernels.Linear(Data.D_in, variance=rng.rand()),
        kernels.RBF(Data.D_in, variance=rng.rand(), lengthscales=rng.rand() + 1.),
    ])


@cache_tensor
def rbf_lin_prod_kern():
    return kernels.Product([
        kernels.RBF(1, variance=rng.rand(), lengthscales=rng.rand() + 1., active_dims=[0]),
        kernels.Linear(1, variance=rng.rand(), active_dims=[1])
    ])


@cache_tensor
def rbf_kern_act_dim_0():
    return kernels.RBF(1, variance=rng.rand(), lengthscales=rng.rand() + 1., active_dims=[0])


@cache_tensor
def rbf_kern_act_dim_1():
    return kernels.RBF(1, variance=rng.rand(), lengthscales=rng.rand() + 1., active_dims=[1])


@cache_tensor
def lin_kern_act_dim_0():
    return kernels.Linear(1, variance=rng.rand(), active_dims=[0])


@cache_tensor
def lin_kern_act_dim_1():
    return kernels.Linear(1, variance=rng.rand(), active_dims=[1])


@cache_tensor
def lin_mean():
    return mean_functions.Linear(A=rng.randn(Data.D_in, Data.D_out), b=rng.randn(Data.D_out))


@cache_tensor
def identity_mean():
    # Note: Identity can only be used if Din == Dout
    return mean_functions.Identity(input_dim=Data.D_in)


@cache_tensor
def const_mean():
    return mean_functions.Constant(c=rng.randn(Data.D_out))


@cache_tensor
def zero_mean():
    return mean_functions.Zero(output_dim=Data.D_out)


def _check(params):
    analytic = expectation(*params)
    quad = quadrature_expectation(*params)
    session = tf.get_default_session()
    analytic, quad = session.run([analytic, quad])
    assert_allclose(analytic, quad, rtol=RTOL)


# =================================== TESTS ===================================

@pytest.mark.parametrize("distribution", [gauss])
@pytest.mark.parametrize("mean1", [lin_mean, identity_mean, const_mean, zero_mean])
@pytest.mark.parametrize("mean2", [lin_mean, identity_mean, const_mean, zero_mean])
@pytest.mark.parametrize("arg_filter",
                         [lambda p, m1, m2: (p, m1),
                          lambda p, m1, m2: (p, m1, m2)])
def test_mean_function_only_expectations(session_tf, distribution, mean1, mean2, arg_filter):
    params = arg_filter(distribution(), mean1(), mean2())
    _check(params)


@pytest.mark.parametrize("distribution", [gauss, gauss_diag])
@pytest.mark.parametrize("kernel", [lin_kern, rbf_kern, rbf_lin_sum_kern, rbf_lin_prod_kern])
@pytest.mark.parametrize("arg_filter",
                         [lambda p, k, f: (p, k),
                          lambda p, k, f: (p, (k, f)),
                          lambda p, k, f: (p, (k, f), (k, f))])
def test_kernel_only_expectations(session_tf, distribution, kernel, feature, arg_filter):
    params = arg_filter(distribution(), kernel(), feature)
    _check(params)


@pytest.mark.parametrize("distribution", [gauss])
@pytest.mark.parametrize("kernel", [rbf_kern, lin_kern, matern_kern, rbf_lin_sum_kern])
@pytest.mark.parametrize("mean", [lin_mean, identity_mean, const_mean, zero_mean])
@pytest.mark.parametrize("arg_filter",
                         [lambda p, k, f, m: (p, (k, f), m),
                          lambda p, k, f, m: (p, m, (k, f))])
def test_kernel_mean_function_expectations(
        session_tf, distribution, kernel, feature, mean, arg_filter):
    params = arg_filter(distribution(), kernel(), feature, mean())
    _check(params)


@pytest.mark.parametrize("kernel", [lin_kern, rbf_kern, rbf_lin_sum_kern, rbf_lin_prod_kern])
def test_eKdiag_no_uncertainty(session_tf, kernel):
    eKdiag = expectation(dirac_diag(), kernel())
    Kdiag = kernel().Kdiag(Data.Xmu)
    eKdiag, Kdiag = session_tf.run([eKdiag, Kdiag])
    assert_allclose(eKdiag, Kdiag, rtol=RTOL)


@pytest.mark.parametrize("kernel", [lin_kern, rbf_kern, rbf_lin_sum_kern, rbf_lin_prod_kern])
def test_eKxz_no_uncertainty(session_tf, kernel, feature):
    eKxz = expectation(dirac_diag(), (kernel(), feature))
    Kxz = kernel().K(Data.Xmu, Data.Z)
    eKxz, Kxz = session_tf.run([eKxz, Kxz])
    assert_allclose(eKxz, Kxz, rtol=RTOL)


@pytest.mark.parametrize("kernel", [lin_kern, rbf_kern, rbf_lin_sum_kern])
@pytest.mark.parametrize("mean", [lin_mean, identity_mean, const_mean, zero_mean])
def test_eMxKxz_no_uncertainty(session_tf, kernel, feature, mean):
    exKxz = expectation(dirac_diag(), mean(), (kernel(), feature))
    Kxz = kernel().K(Data.Xmu, Data.Z)
    xKxz = expectation(dirac_gauss(), mean())[:, :, None] * Kxz[:, None, :]
    exKxz, xKxz = session_tf.run([exKxz, xKxz])
    assert_allclose(exKxz, xKxz, rtol=RTOL)


@pytest.mark.parametrize("kernel", [lin_kern, rbf_kern, rbf_lin_sum_kern, rbf_lin_prod_kern])
def test_eKzxKxz_no_uncertainty(session_tf, kernel, feature):
    kern = kernel()
    eKzxKxz = expectation(dirac_diag(), (kern, feature), (kern, feature))
    Kxz = kern.K(Data.Xmu, Data.Z)
    eKzxKxz, Kxz = session_tf.run([eKzxKxz, Kxz])
    KzxKxz = Kxz[:, :, None] * Kxz[:, None, :]
    assert_allclose(eKzxKxz, KzxKxz, rtol=RTOL)


def test_RBF_eKzxKxz_gradient_notNaN(session_tf):
    """
    Ensure that <K_{Z, x} K_{x, Z}>_p(x) is not NaN and correct, when
    K_{Z, Z} is zero with finite precision. See pull request #595.
    """
    kern = gpflow.kernels.RBF(1, lengthscales=0.1)
    kern.variance = 2.

    p = gpflow.probability_distributions.Gaussian(
        tf.constant([[10]], dtype=gpflow.settings.tf_float),
        tf.constant([[[0.1]]], dtype=gpflow.settings.tf_float))
    z = gpflow.features.InducingPoints([[-10.], [10.]])

    ekz = expectation(p, (kern, z), (kern, z))

    g, = tf.gradients(ekz, kern.lengthscales._unconstrained_tensor)
    grad = session_tf.run(g)
    assert grad is not None and not np.isnan(grad)


@pytest.mark.parametrize("kernel1", [rbf_kern_act_dim_0, lin_kern_act_dim_0])
@pytest.mark.parametrize("kernel2", [rbf_kern_act_dim_1, lin_kern_act_dim_1])
def test_eKzxKxz_separate_dims_simplification(
        session_tf, kernel1, kernel2, feature):
    _check((gauss_diag(), (kernel1(), feature), (kernel2(), feature)))


def test_eKzxKxz_different_sum_kernels(session_tf, feature):
    kern1, kern2 = rbf_lin_sum_kern(), rbf_lin_sum_kern2()
    _check((gauss(), (kern1, feature), (kern2, feature)))


def test_eKzxKxz_same_vs_different_sum_kernels(session_tf, feature):
    # check the result is the same if we pass different objects with the same value
    kern1 = rbf_lin_sum_kern2()
    kern2 = copy.copy(rbf_lin_sum_kern2())
    same = expectation(*(gauss(), (kern1, feature), (kern1, feature)))
    different = expectation(*(gauss(), (kern1, feature), (kern2, feature)))
    session = tf.get_default_session()
    same, different = session.run([same, different])
    assert_allclose(same, different, rtol=RTOL)


@pytest.mark.parametrize("kernel", [rbf_kern, lin_kern, rbf_lin_sum_kern])
def test_exKxz_markov(session_tf, kernel, feature):
    _check((markov_gauss(), (kernel(), feature), identity_mean()))


@pytest.mark.parametrize("kernel", [rbf_kern, lin_kern, rbf_lin_sum_kern])
def test_exKxz_markov_no_uncertainty(session_tf, kernel, feature):
    exKxz = expectation(dirac_markov_gauss(), (kernel(), feature), identity_mean())
    exKxz = session_tf.run(exKxz)
    Kzx = kernel().compute_K(Data.Xmu_markov[:-1, :], Data.Z)  # NxM
    xKxz = Kzx[..., None] * Data.Xmu_markov[1:, None, :]  # NxMxD
    assert_allclose(exKxz, xKxz, rtol=RTOL)


@pytest.mark.parametrize("distribution", [gauss, gauss_diag, markov_gauss])
def test_cov_shape_inference(session_tf, distribution, feature):
    gauss_tuple = (distribution().mu, distribution().cov)
    _check((gauss_tuple, (rbf_kern(), feature)))
    if isinstance(distribution(), MarkovGaussian):
        _check((gauss_tuple, None, (rbf_kern(), feature)))
The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API