https://github.com/GPflow/GPflow
Raw File
Tip revision: 37db81f85896ff812533c6d4885feaaed7fff47d authored by ST John on 29 January 2020, 19:09:29 UTC
Merge branch 'develop' of github.com:GPflow/GPflow into st/reorder_covariances
Tip revision: 37db81f
test_mcmc_helper.py
import numpy as np
import pytest
import tensorflow as tf
import tensorflow_probability as tfp

import gpflow
from gpflow.config import set_default_float
from gpflow.utilities import to_default_float

np.random.seed(1)


def build_data():
    N = 30
    X = np.random.rand(N, 1)
    Y = np.sin(12*X) + 0.66*np.cos(25*X) + np.random.randn(N, 1)*0.1 + 3
    return (X, Y)


def build_model(data):

    kernel = gpflow.kernels.Matern52(lengthscale=0.3)

    meanf = gpflow.mean_functions.Linear(1.0, 0.0)
    model = gpflow.models.GPR(data, kernel, meanf)
    model.likelihood.variance.assign(0.01)
    return model


def test_mcmc_helper_parameters():
    data = build_data()
    model = build_model(data)

    hmc_helper = gpflow.optimizers.SamplingHelper(
        model.trainable_parameters, model.log_marginal_likelihood
    )

    for i in range(len(model.trainable_parameters)):
        assert model.trainable_parameters[i].shape == hmc_helper.current_state[i].shape
        assert model.trainable_parameters[i] == hmc_helper._parameters[i]
        if isinstance(model.trainable_parameters[i], gpflow.Parameter):
            assert model.trainable_parameters[i].unconstrained_variable == hmc_helper.current_state[i]


def test_mcmc_helper_target_function():
    data = build_data()
    model = build_model(data)

    hmc_helper = gpflow.optimizers.SamplingHelper(
        model.trainable_parameters, model.log_marginal_likelihood
    )

    target_log_prob_fn = hmc_helper.target_log_prob_fn

    assert model.log_marginal_likelihood() == target_log_prob_fn()

    model.likelihood.variance.assign(1)

    assert model.log_marginal_likelihood() == target_log_prob_fn()

    # test the wrapped closure
    log_prob, grad_fn = target_log_prob_fn.__original_wrapped__()
    grad, nones =  grad_fn(1, [None] * len(model.trainable_parameters))
    assert len(grad) == len(model.trainable_parameters)
    assert nones == [None] * len(model.trainable_parameters)

def test_mcmc_sampler_integration():
    data = build_data()
    model = build_model(data)

    hmc_helper = gpflow.optimizers.SamplingHelper(
        model.trainable_parameters, model.log_marginal_likelihood
    )

    hmc = tfp.mcmc.HamiltonianMonteCarlo(
        target_log_prob_fn=hmc_helper.target_log_prob_fn,
        num_leapfrog_steps=2,
        step_size=0.01
    )

    adaptive_hmc = tfp.mcmc.SimpleStepSizeAdaptation(
        hmc,
        num_adaptation_steps=2,
        target_accept_prob=gpflow.utilities.to_default_float(0.75),
        adaptation_rate=0.1
    )

    num_samples = 5

    @tf.function
    def run_chain_fn():
        return tfp.mcmc.sample_chain(
            num_results=num_samples,
            num_burnin_steps=2,
            current_state=hmc_helper.current_state,
            kernel=adaptive_hmc,
            trace_fn=lambda _, pkr: pkr.inner_results.is_accepted
        )
    samples, _ = run_chain_fn()

    assert len(samples) == len(model.trainable_parameters)
    parameter_samples = hmc_helper.convert_constrained_values(samples)
    assert len(parameter_samples) == len(samples)

    for i in range(len(model.trainable_parameters)):
        assert len(samples[i]) == num_samples
        assert hmc_helper.current_state[i].numpy() == samples[i][-1]
        assert hmc_helper._parameters[i].numpy() == parameter_samples[i][-1]
back to top