Revision f4ac4c5a52603afdd12c258958456bd170df7092 authored by ST John on 24 March 2022, 08:32:27 UTC, committed by ST John on 24 March 2022, 08:32:27 UTC
1 parent cc7ed07
test_bo_integration.py
# Copyright 2022 The GPflow Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import (
Any,
Callable,
DefaultDict,
Generic,
Iterator,
List,
Set,
Tuple,
Type,
TypeVar,
cast,
)
import numpy as np
import pytest
import tensorflow as tf
from _pytest.fixtures import SubRequest
import gpflow
from gpflow.base import RegressionData
from gpflow.config import default_float
from gpflow.inducing_variables import InducingPoints, InducingVariables
from gpflow.kernels import Kernel, Matern52
from gpflow.likelihoods import Exponential, Likelihood
from gpflow.models import GPR, SGPR, SVGP, VGP, GPModel, training_loss_closure
from gpflow.models.vgp import update_vgp_data
from gpflow.posteriors import AbstractPosterior, PrecomputeCacheType
_M = TypeVar("_M", bound=GPModel, covariant=True)
_CreateModel = Callable[[RegressionData], _M]
# I'd like to make this a `dataclass`, but mypy get confused about `create_model` being a function
# member, but that doesn't take `self`.
class _ModelFactory(Generic[_M]):
def __init__(
self,
create_model: _CreateModel[_M],
multi_output: bool,
atol: float,
rtol: float,
) -> None:
self.create_model = create_model
self.multi_output = multi_output
self.atol = atol
self.rtol = rtol
_MODEL_FACTORIES: List[_ModelFactory[Any]] = []
# This exists to make it easy to disable tf.function, for debugging.
_COMPILE = True
_MAXITER = 10
_DEFAULT_ATOL = 1e-10
_DEFAULT_RTOL = 1e-7
@pytest.fixture(name="register_posterior_bo_integration_test")
def _register_posterior_bo_integration_test(
request: SubRequest,
tested_posteriors: DefaultDict[str, Set[Type[AbstractPosterior]]],
) -> Callable[[AbstractPosterior], None]:
def _register_posterior(posterior: AbstractPosterior) -> None:
tested_posteriors[request.function.__name__].add(posterior.__class__)
return _register_posterior
def model_factory(
multi_output: bool = False, atol: float = _DEFAULT_ATOL, rtol: float = _DEFAULT_RTOL
) -> Callable[[_CreateModel[_M]], _ModelFactory[_M]]:
""" Decorator for adding a function to the `_MODEL_FACTORIES` list. """
def register(create_model: _CreateModel[_M]) -> _ModelFactory[_M]:
model_factory = _ModelFactory(
create_model,
multi_output,
atol,
rtol,
)
_MODEL_FACTORIES.append(model_factory)
return model_factory
return register
def create_kernel() -> Kernel:
return Matern52()
def create_likelihood() -> Likelihood:
return Exponential()
def create_inducing_points(data: RegressionData) -> InducingPoints:
n_features = data[0].shape[1]
n_inducing_points = 5
rng = np.random.default_rng(20220208)
Z = tf.constant(rng.random((n_inducing_points, n_features)))
return InducingPoints(Z)
def create_q(
inducing_variable: InducingVariables, *, row_scale: int = 1, column_scale: int = 1
) -> Tuple[bool, tf.Tensor, tf.Tensor]:
n_inducing_points = inducing_variable.num_inducing
rng = np.random.default_rng(20220133)
q_diag = True
q_mu = tf.constant(rng.random((row_scale * n_inducing_points, column_scale)))
q_sqrt = tf.constant(rng.random((row_scale * n_inducing_points, column_scale))) ** 2
return q_diag, q_mu, q_sqrt
@model_factory(rtol=1e-3)
def create_gpr(data: RegressionData) -> GPR:
return GPR(data=data, kernel=create_kernel())
@model_factory(rtol=1e-4)
def create_sgpr(data: RegressionData) -> SGPR:
return SGPR(data=data, kernel=create_kernel(), inducing_variable=create_inducing_points(data))
@model_factory(rtol=5e-3)
def create_vgp(data: RegressionData) -> VGP:
return VGP(data=data, kernel=create_kernel(), likelihood=create_likelihood())
@model_factory()
def create_svgp__independent_single_output(data: RegressionData) -> SVGP:
inducing_variable = create_inducing_points(data)
q_diag, q_mu, q_sqrt = create_q(inducing_variable)
return SVGP(
kernel=create_kernel(),
likelihood=create_likelihood(),
inducing_variable=inducing_variable,
q_diag=q_diag,
q_mu=q_mu,
q_sqrt=q_sqrt,
)
@model_factory(multi_output=True)
def create_svgp__fully_correlated_multi_output(data: RegressionData) -> SVGP:
n_outputs = data[1].shape[1]
kernel = gpflow.kernels.SharedIndependent(create_kernel(), output_dim=n_outputs)
inducing_variable = create_inducing_points(data)
q_diag, q_mu, q_sqrt = create_q(inducing_variable, row_scale=n_outputs)
return SVGP(
kernel=kernel,
likelihood=create_likelihood(),
inducing_variable=inducing_variable,
q_diag=q_diag,
q_mu=q_mu,
q_sqrt=q_sqrt,
)
@model_factory(multi_output=True)
def create_svgp__independent_multi_output(data: RegressionData) -> SVGP:
n_outputs = data[1].shape[1]
kernel = gpflow.kernels.SharedIndependent(create_kernel(), output_dim=n_outputs)
inducing_variable = gpflow.inducing_variables.SharedIndependentInducingVariables(
create_inducing_points(data)
)
q_diag, q_mu, q_sqrt = create_q(inducing_variable, column_scale=n_outputs)
return SVGP(
kernel=kernel,
likelihood=create_likelihood(),
inducing_variable=inducing_variable,
q_diag=q_diag,
q_mu=q_mu,
q_sqrt=q_sqrt,
)
@model_factory(multi_output=True)
def create_svgp__fallback_independent_latent_posterior(data: RegressionData) -> SVGP:
n_outputs = data[1].shape[1]
rng = np.random.default_rng(20220131)
kernel = gpflow.kernels.LinearCoregionalization(
[create_kernel()],
W=tf.constant(rng.standard_normal((n_outputs, 1))),
)
inducing_variable = gpflow.inducing_variables.FallbackSeparateIndependentInducingVariables(
[create_inducing_points(data)]
)
q_diag, q_mu, q_sqrt = create_q(inducing_variable)
return SVGP(
kernel=kernel,
likelihood=create_likelihood(),
inducing_variable=inducing_variable,
q_diag=q_diag,
q_mu=q_mu,
q_sqrt=q_sqrt,
)
@model_factory(multi_output=True)
def create_svgp__linear_coregionalization(data: RegressionData) -> SVGP:
n_outputs = data[1].shape[1]
rng = np.random.default_rng(20220131)
kernel = gpflow.kernels.LinearCoregionalization(
[create_kernel()], W=tf.constant(rng.standard_normal((n_outputs, 1)))
)
inducing_variable = gpflow.inducing_variables.SharedIndependentInducingVariables(
create_inducing_points(data)
)
q_diag, q_mu, q_sqrt = create_q(inducing_variable)
return SVGP(
kernel=kernel,
likelihood=create_likelihood(),
inducing_variable=inducing_variable,
q_diag=q_diag,
q_mu=q_mu,
q_sqrt=q_sqrt,
)
@pytest.fixture(params=_MODEL_FACTORIES)
def _model_factory(request: SubRequest) -> _ModelFactory[Any]:
return cast(_ModelFactory[Any], request.param)
@pytest.fixture
def _f_minimum(_model_factory: _ModelFactory[Any]) -> tf.Tensor:
return (
tf.constant(
[
[0.2, 0.4],
[0.4, 0.6],
[0.6, 0.8],
],
dtype=default_float(),
)
if _model_factory.multi_output
else tf.constant([[0.3, 0.5]], dtype=default_float())
)
@pytest.fixture
def _f(_f_minimum: tf.Tensor) -> Callable[[tf.Tensor], tf.Tensor]:
def f(X: tf.Tensor) -> tf.Tensor:
err = X[:, None, :] - _f_minimum[None, :, :]
err_sq = err ** 2
return tf.reduce_sum(err_sq, axis=-1)
return f
@pytest.fixture
def _data(
_f: Callable[[tf.Tensor], tf.Tensor], _f_minimum: tf.Tensor
) -> Tuple[tf.Variable, tf.Variable]:
n_initial_data = 3
n_outputs, n_features = _f_minimum.shape
rng = np.random.default_rng(20220126)
X = tf.Variable(
rng.random((n_initial_data, n_features)),
shape=[None, n_features],
dtype=default_float(),
trainable=False,
)
Y = tf.Variable(
_f(X),
shape=[None, n_outputs],
dtype=default_float(),
trainable=False,
)
return X, Y
@pytest.fixture
def _extend_data(
_data: Tuple[tf.Variable, tf.Variable], _f: Callable[[tf.Tensor], tf.Tensor]
) -> Callable[[GPModel], Iterator[int]]:
n_iterations = 3
rng = np.random.default_rng(20220127)
X, Y = _data
n_features = X.shape[1]
def iterate(model: GPModel) -> Iterator[int]:
for i in range(n_iterations):
X_new = tf.constant(rng.random((1, n_features)))
Y_new = _f(X_new)
X_i = tf.concat([X, X_new], axis=0)
Y_i = tf.concat([Y, Y_new], axis=0)
if isinstance(model, VGP):
update_vgp_data(model, (X_i, Y_i))
else:
X.assign(X_i)
Y.assign(Y_i)
yield i
return iterate
@pytest.fixture
def _X_new(_data: Tuple[tf.Variable, tf.Variable]) -> tf.Tensor:
rng = np.random.default_rng(20220128)
X, _Y = _data
n_features = X.shape[1]
return tf.constant(rng.random((3, n_features)))
@pytest.fixture
def _optimize(_data: Tuple[tf.Variable, tf.Variable]) -> Callable[[GPModel], None]:
def optimize(model: GPModel) -> None:
gpflow.optimizers.Scipy().minimize(
training_loss_closure(model, _data, compile=_COMPILE),
variables=model.trainable_variables,
options=dict(maxiter=_MAXITER),
method="BFGS",
compile=_COMPILE,
)
return optimize
def test_posterior_bo_integration__predict_f(
register_posterior_bo_integration_test: Callable[[AbstractPosterior], None],
_model_factory: _ModelFactory[Any],
_data: Tuple[tf.Variable, tf.Variable],
_extend_data: Callable[[GPModel], Iterator[int]],
_X_new: tf.Tensor,
) -> None:
"""
Check that data added incrementally is correctly reflected in `predict_f`.
"""
_X, Y = _data
n_rows_new = _X_new.shape[0]
n_outputs = Y.shape[1]
model = _model_factory.create_model(_data)
posterior = model.posterior(PrecomputeCacheType.VARIABLE)
register_posterior_bo_integration_test(posterior)
predict_f = posterior.predict_f
if _COMPILE:
predict_f = tf.function(predict_f)
for _ in _extend_data(model):
posterior.update_cache()
compiled_mean, compiled_var = predict_f(_X_new)
np.testing.assert_equal((n_rows_new, n_outputs), compiled_mean.shape)
np.testing.assert_equal((n_rows_new, n_outputs), compiled_var.shape)
eager_model = _model_factory.create_model(_data)
eager_mean, eager_var = eager_model.predict_f(_X_new)
np.testing.assert_allclose(
eager_mean, compiled_mean, rtol=_model_factory.rtol, atol=_model_factory.atol
)
np.testing.assert_allclose(
eager_var, compiled_var, rtol=_model_factory.rtol, atol=_model_factory.atol
)
def test_posterior_bo_integration__optimization(
register_posterior_bo_integration_test: Callable[[AbstractPosterior], None],
_model_factory: _ModelFactory[Any],
_data: Tuple[tf.Variable, tf.Variable],
_extend_data: Callable[[GPModel], Iterator[int]],
_X_new: tf.Tensor,
_optimize: Callable[[GPModel], None],
) -> None:
"""
Check that data added incrementally is considered when optimizing a model.
"""
_X, Y = _data
n_rows_new = _X_new.shape[0]
n_outputs = Y.shape[1]
model = _model_factory.create_model(_data)
posterior = model.posterior(PrecomputeCacheType.VARIABLE)
register_posterior_bo_integration_test(posterior)
predict_f = posterior.predict_f
if _COMPILE:
predict_f = tf.function(predict_f)
# Add all the data first, and then `optimize`, so that both models are optimized the same number
# of times and with the same data, so they converge to the same result.
for _ in _extend_data(model):
pass
_optimize(model)
posterior.update_cache()
compiled_mean, compiled_var = predict_f(_X_new)
np.testing.assert_equal((n_rows_new, n_outputs), compiled_mean.shape)
np.testing.assert_equal((n_rows_new, n_outputs), compiled_var.shape)
eager_model = _model_factory.create_model(_data)
_optimize(eager_model)
eager_mean, eager_var = eager_model.predict_f(_X_new)
np.testing.assert_allclose(
eager_mean, compiled_mean, rtol=_model_factory.rtol, atol=_model_factory.atol
)
np.testing.assert_allclose(
eager_var, compiled_var, rtol=_model_factory.rtol, atol=_model_factory.atol
)
Computing file changes ...