from typing import Tuple import numpy as np import tensorflow as tf import tensorflow_probability as tfp import gpflow from gpflow import Parameter, covariances, kullback_leiblers from gpflow.conditionals.util import expand_independent_outputs, mix_latent_gp # from gpflow.conditionals import conditional from gpflow.config import default_float, default_jitter from gpflow.models.model import GPModel, InputData, MeanAndVariance, RegressionData from gpflow.models.training_mixins import ExternalDataTrainingLossMixin from gpflow.models.util import inducingpoint_wrapper from gpflow.utilities import positive, triangular class DiagNormal(gpflow.Module): def __init__(self, q_mu, q_sqrt): self.q_mu = gpflow.Parameter(q_mu) # [M, L] self.q_sqrt = gpflow.Parameter(q_sqrt) # [M, L] class MvnNormal(gpflow.Module): def __init__(self, q_mu, q_sqrt): self.q_mu = gpflow.Parameter(q_mu) # [M, L] self.q_sqrt = gpflow.Parameter(q_sqrt, transform=gpflow.utilities.triangular()) # [L, M, M] def eye_like(A): return tf.eye(tf.shape(A)[-1], dtype=A.dtype) """ @dispatch def cond_precompute(kernel, iv, q_mu, q_sqrt, whiten): pass @dispatch def cond_execute(Xnew, kernel, iv, alpha, Qinv): pass @dispatch def conditional(kernel, iv, q_mu, q_sqrt, whiten): # precompute def execute(Xnew, full_cov): pass return execute """ class Posterior: def __init__(self, kernel, iv, q_dist, whiten=True, mean_function=None): self.iv = iv self.kernel = kernel self.q_dist = q_dist self.mean_function = mean_function self.whiten = whiten self._precompute() # populates self.alpha and self.Qinv def freeze(self): """ Note- this simply cuts the computational graph """ self.alpha = tf.constant(self.alpha.numpy()) self.Qinv = tf.constant(self.Qinv.numpy()) def predict_f( self, Xnew, full_cov: bool = False, full_output_cov: bool = False ) -> MeanAndVariance: # Qinv: [L, M, M] # alpha: [M, L] Kuf = covariances.Kuf(self.iv, self.kernel, Xnew) # [(R), M, N] mean = tf.matmul(Kuf, self.alpha, transpose_a=True) if Kuf.shape.ndims == 3: mean = tf.einsum("...rn->...nr", tf.squeeze(mean, axis=-1)) if isinstance( self.kernel, (gpflow.kernels.SeparateIndependent, gpflow.kernels.IndependentLatent) ): Knn = tf.stack([k(Xnew, full_cov=full_cov) for k in self.kernel.kernels], axis=0) elif isinstance(self.kernel, gpflow.kernels.MultioutputKernel): Knn = self.kernel.kernel(Xnew, full_cov=full_cov) else: Knn = self.kernel(Xnew, full_cov=full_cov) if full_cov: Kfu_Qinv_Kuf = tf.matmul(Kuf, tf.matmul(self.Qinv, Kuf), transpose_a=True) cov = Knn - Kfu_Qinv_Kuf else: # [AT B]_ij = AT_ik B_kj = A_ki B_kj # TODO check whether einsum is faster now? Kfu_Qinv_Kuf = tf.reduce_sum(Kuf * tf.matmul(self.Qinv, Kuf), axis=-2) cov = Knn - Kfu_Qinv_Kuf cov = tf.linalg.adjoint(cov) if isinstance(self.kernel, gpflow.kernels.LinearCoregionalization): cov = expand_independent_outputs(cov, full_cov, full_output_cov=False) mean, cov = mix_latent_gp(self.kernel.W, mean, cov, full_cov, full_output_cov) else: cov = expand_independent_outputs(cov, full_cov, full_output_cov) return mean + self.mean_function(Xnew), cov def _precompute(self): Kuu = covariances.Kuu(self.iv, self.kernel, jitter=default_jitter()) # [(R), M, M] L = tf.linalg.cholesky(Kuu) q_mu = self.q_dist.q_mu if Kuu.shape.ndims == 3: q_mu = tf.einsum("...mr->...rm", self.q_dist.q_mu)[..., None] # [..., R, M, 1] if not self.whiten: # alpha = Kuu⁻¹ q_mu alpha = tf.linalg.cholesky_solve(L, q_mu) else: # alpha = L⁻T q_mu alpha = tf.linalg.triangular_solve(L, q_mu, adjoint=True) # predictive mean = Kfu alpha # predictive variance = Kff - Kfu Qinv Kuf # S = q_sqrt q_sqrtT if not self.whiten: # Qinv = Kuu⁻¹ - Kuu⁻¹ S Kuu⁻¹ # = Kuu⁻¹ - L⁻T L⁻¹ S L⁻T L⁻¹ # = L⁻T (I - L⁻¹ S L⁻T) L⁻¹ # = L⁻T B L⁻¹ if isinstance(self.q_dist, DiagNormal): q_sqrt = tf.linalg.diag(tf.linalg.adjoint(self.q_dist.q_sqrt)) else: q_sqrt = self.q_dist.q_sqrt Linv_qsqrt = tf.linalg.triangular_solve(L, q_sqrt) Linv_cov_u_LinvT = tf.matmul(Linv_qsqrt, Linv_qsqrt, transpose_b=True) else: if isinstance(self.q_dist, DiagNormal): Linv_cov_u_LinvT = tf.linalg.diag(tf.linalg.adjoint(self.q_dist.q_sqrt ** 2)) else: q_sqrt = self.q_dist.q_sqrt Linv_cov_u_LinvT = tf.matmul(q_sqrt, q_sqrt, transpose_b=True) # Qinv = Kuu⁻¹ - L⁻T S L⁻¹ # Linv = (L⁻¹ I) = solve(L, I) # Kinv = Linv.T @ Linv I = eye_like(Linv_cov_u_LinvT) B = I - Linv_cov_u_LinvT LinvT_B = tf.linalg.triangular_solve(L, B, adjoint=True) B_Linv = tf.linalg.adjoint(LinvT_B) Qinv = tf.linalg.triangular_solve(L, B_Linv, adjoint=True) self.alpha = Parameter(alpha, trainable=False) self.Qinv = Parameter(Qinv, trainable=False) class NewSVGP(GPModel, ExternalDataTrainingLossMixin): """ Differences from gpflow.models.SVGP: - q_dist instead of q_mu/q_sqrt - posterior() method """ def __init__( self, kernel, likelihood, inducing_variable, *, mean_function=None, num_latent_gps: int = 1, q_diag: bool = False, q_mu=None, q_sqrt=None, whiten: bool = True, num_data=None, ): super().__init__(kernel, likelihood, mean_function, num_latent_gps) self.num_data = num_data self.q_diag = q_diag self.whiten = whiten self.inducing_variable = inducingpoint_wrapper(inducing_variable) # init variational parameters num_inducing = self.inducing_variable.num_inducing self.q_dist = self._init_variational_parameters(num_inducing, q_mu, q_sqrt, q_diag) def posterior(self, freeze=False): """ If freeze=True, cuts the computational graph after precomputing alpha and Qinv this works around some issues in the tensorflow graph optimisation and gives much faster prediction when wrapped inside tf.function() """ posterior = Posterior( self.kernel, self.inducing_variable, self.q_dist, whiten=self.whiten, mean_function=self.mean_function, ) if freeze: posterior.freeze() return posterior def predictor(self): return conditional_closure( self.kernel, self.inducing_variable, self.q_dist, whiten=self.whiten, mean_function=self.mean_function, ) def _init_variational_parameters(self, num_inducing, q_mu, q_sqrt, q_diag): q_mu = np.zeros((num_inducing, self.num_latent_gps)) if q_mu is None else q_mu q_mu = Parameter(q_mu, dtype=default_float()) # [M, P] if q_diag: if q_sqrt is None: q_sqrt = np.ones((num_inducing, self.num_latent_gps), dtype=default_float()) else: assert q_sqrt.ndim == 2 self.num_latent_gps = q_sqrt.shape[1] q_sqrt = Parameter(q_sqrt, transform=positive()) # [M, L|P] return DiagNormal(q_mu, q_sqrt) else: if q_sqrt is None: q_sqrt = np.array( [ np.eye(num_inducing, dtype=default_float()) for _ in range(self.num_latent_gps) ] ) else: assert q_sqrt.ndim == 3 self.num_latent_gps = q_sqrt.shape[0] num_inducing = q_sqrt.shape[1] q_sqrt = Parameter(q_sqrt, transform=triangular()) # [L|P, M, M] return MvnNormal(q_mu, q_sqrt) def prior_kl(self) -> tf.Tensor: return kullback_leiblers.prior_kl( self.inducing_variable, self.kernel, self.q_dist.q_mu, self.q_dist.q_sqrt, whiten=self.whiten, ) def maximum_log_likelihood_objective(self, data: RegressionData) -> tf.Tensor: return self.elbo(data) def elbo(self, data: RegressionData) -> tf.Tensor: """ This gives a variational bound (the evidence lower bound or ELBO) on the log marginal likelihood of the model. """ X, Y = data kl = self.prior_kl() # f_mean, f_var = self.posterior().predict_f(X, full_cov=False, full_output_cov=False) f_mean, f_var = self.predictor()(X, full_cov=False, full_output_cov=False) var_exp = self.likelihood.variational_expectations(f_mean, f_var, Y) if self.num_data is not None: num_data = tf.cast(self.num_data, kl.dtype) minibatch_size = tf.cast(tf.shape(X)[0], kl.dtype) scale = num_data / minibatch_size else: scale = tf.cast(1.0, kl.dtype) return tf.reduce_sum(var_exp) * scale - kl def predict_f(self, Xnew: InputData, full_cov=False, full_output_cov=False) -> MeanAndVariance: """ For backwards compatibility For fast prediction, get a posterior object first: model.posterior() -- see freeze argument then do posterior.predict_f(Xnew...) """ return self.posterior(freeze=False).predict_f( # return self.predictor()( Xnew, full_cov=full_cov, full_output_cov=full_output_cov, ) """ # Option 1: recomputing from scratch m = SVGP(...) # optimize model p1 = m.posterior(freeze=True) p2 = m.posterior(freeze=True) # recomputed - we don't want that # optimize some more p3 = m.posterior(freeze=True) # recomputes - we want that p4 = m.posterior(freeze=True) # recomputes as well - we don't want that # Option 2: caching directly m = SVGP(...) # optimize model p1 = m.posterior(freeze=True) p2 = m.posterior(freeze=True) # does not recompute - we want that # optimize some more p3 = m.posterior(freeze=True) # still the old version!? - we don't want that property: m.predict_f # already does the computation predfn = m.predict_f # -> function object predfn() ^ v equivalent m.predict_f() """ def make_models(M=64, D=5, L=3, q_diag=False, whiten=True, mo=None): if mo is None: k = gpflow.kernels.Matern52() Z = np.random.randn(M, D) else: kernel_type, iv_type = mo k_list = [gpflow.kernels.Matern52() for _ in range(L)] w = tf.Variable(initial_value=np.random.rand(2, L), dtype=tf.float64, name="w") if kernel_type == "LinearCoregionalization": k = gpflow.kernels.LinearCoregionalization(k_list, W=w) elif kernel_type == "SeparateIndependent": k = gpflow.kernels.SeparateIndependent(k_list) elif kernel_type == "SharedIndependent": k = gpflow.kernels.SharedIndependent(k_list[0], output_dim=2) iv_list = [ gpflow.inducing_variables.InducingPoints(np.random.randn(M, D)) for _ in range(L) ] if iv_type == "SeparateIndependent": Z = gpflow.inducing_variables.SeparateIndependentInducingVariables(iv_list) elif iv_type == "SharedIndependent": Z = gpflow.inducing_variables.SharedIndependentInducingVariables(iv_list[0]) lik = gpflow.likelihoods.Gaussian(0.1) q_mu = np.random.randn(M, L) if q_diag: q_sqrt = np.random.randn(M, L) ** 2 else: q_sqrt = np.tril(np.random.randn(L, M, M)) mold = gpflow.models.SVGP(k, lik, Z, q_diag=q_diag, q_mu=q_mu, q_sqrt=q_sqrt, whiten=whiten) mnew = NewSVGP(k, lik, Z, q_diag=q_diag, q_mu=q_mu, q_sqrt=q_sqrt, whiten=whiten) return mold, mnew # TODO: compare timings for q_diag=True, whiten=False, ... mold, mnew = make_models(q_diag=False, mo=None) X = np.random.randn(100, 5) Xt = tf.convert_to_tensor(X) pred_old = tf.function(mold.predict_f) pred_newfrozen = tf.function(mnew.posterior(freeze=True).predict_f) pred_new = tf.function(mnew.posterior(freeze=False).predict_f) def predict_f_once(Xnew): return mnew.posterior().predict_f(Xnew) pred_new_once = tf.function(predict_f_once) # pred_new = tf.function(mnew.predictor()) # %timeit pred_old(Xt) # %timeit pred_new(Xt) def test_correct(): from itertools import product conf = product(*[(True, False)] * 3) for q_diag, white, use_mo in conf: if use_mo: mo_options = list( product( ("LinearCoregionalization", "SeparateIndependent", "SharedIndependent"), ("SeparateIndependent", "SharedIndependent"), ) ) else: mo_options = [None] for mo in mo_options: print(f"Testing q_diag={q_diag} white={white} mo={mo}...") mold, mnew = make_models(q_diag=q_diag, whiten=white, mo=mo) mu, var = mnew.predict_f(X, full_cov=True, full_output_cov=True) mu2, var2 = mold.predict_f(X, full_cov=True, full_output_cov=True) np.testing.assert_allclose(mu, mu2) np.testing.assert_allclose(var, var2) mu, var = mnew.predict_f(X, full_cov=True, full_output_cov=False) mu2, var2 = mold.predict_f(X, full_cov=True, full_output_cov=False) np.testing.assert_allclose(mu, mu2) np.testing.assert_allclose(var, var2) mu, var = mnew.predict_f(X, full_cov=False, full_output_cov=True) mu2, var2 = mold.predict_f(X, full_cov=False, full_output_cov=True) np.testing.assert_allclose(mu, mu2) np.testing.assert_allclose(var, var2) mu, var = mnew.predict_f(X, full_cov=False, full_output_cov=False) mu2, var2 = mold.predict_f(X, full_cov=False, full_output_cov=False) np.testing.assert_allclose(mu, mu2) np.testing.assert_allclose(var, var2) test_correct()