# Copyright 2016 the GPflow authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import tensorflow as tf import numpy as np from .. import settings from .. import likelihoods from .. import kernels from .. import features from ..mean_functions import Zero from ..expectations import expectation from ..probability_distributions import DiagonalGaussian from .model import GPModel from .gpr import GPR class GPLVM(GPR): """ Standard GPLVM where the likelihood can be optimised with respect to the latent X. """ def __init__(self, Y, latent_dim, X_mean=None, kern=None, mean_function=None, **kwargs): """ Initialise GPLVM object. This method only works with a Gaussian likelihood. :param Y: data matrix, size N (number of points) x D (dimensions) :param Z: matrix of inducing points, size M (inducing points) x Q (latent dimensions) :param X_mean: latent positions ([N, Q]), for the initialisation of the latent space. :param kern: kernel specification, by default RBF :param mean_function: mean function, by default None. """ if mean_function is None: mean_function = Zero() if kern is None: kern = kernels.RBF(latent_dim, ARD=True) if X_mean is None: X_mean = PCA_reduce(Y, latent_dim) num_latent = X_mean.shape[1] if num_latent != latent_dim: msg = 'Passed in number of latent {0} does not match initial X {1}.' raise ValueError(msg.format(latent_dim, num_latent)) if Y.shape[1] < num_latent: raise ValueError('More latent dimensions than observed.') GPR.__init__(self, X_mean, Y, kern, mean_function=mean_function, **kwargs) del self.X # in GPLVM this is a Param self.X = Parameter(X_mean) class BayesianGPLVM(GPModel): def __init__(self, X_mean, X_var, Y, kern, M, Z=None, X_prior_mean=None, X_prior_var=None): """ Initialise Bayesian GPLVM object. This method only works with a Gaussian likelihood. :param X_mean: initial latent positions, size N (number of points) x Q (latent dimensions). :param X_var: variance of latent positions ([N, Q]), for the initialisation of the latent space. :param Y: data matrix, size N (number of points) x D (dimensions) :param kern: kernel specification, by default RBF :param M: number of inducing points :param Z: matrix of inducing points, size M (inducing points) x Q (latent dimensions). By default random permutation of X_mean. :param X_prior_mean: prior mean used in KL term of bound. By default 0. Same size as X_mean. :param X_prior_var: pripor variance used in KL term of bound. By default 1. """ GPModel.__init__(self, X_mean, Y, kern, likelihood=likelihoods.Gaussian(), mean_function=Zero()) del self.X # in GPLVM this is a Param self.X_mean = Parameter(X_mean) # diag_transform = transforms.DiagMatrix(X_var.shape[1]) # self.X_var() = Parameter(diag_transform.forward(transforms.positive.backward(X_var)) if X_var.ndim == 2 else X_var, # diag_transform) assert X_var.ndim == 2 self.X_var = Parameter(X_var, transform=transforms.positive) self.num_data, self.num_latent = X_mean.shape self.output_dim = Y.shape[1] assert np.all(X_mean.shape == X_var.shape) assert X_mean.shape[0] == Y.shape[0], 'X mean and Y must be same size.' assert X_var.shape[0] == Y.shape[0], 'X var and Y must be same size.' # inducing points if Z is None: # By default we initialize by subset of initial latent points Z = np.random.permutation(X_mean.copy())[:M] self.feature = features.InducingPoints(Z) assert len(self.feature) == M assert X_mean.shape[1] == self.num_latent # deal with parameters for the prior mean variance of X if X_prior_mean is None: X_prior_mean = np.zeros((self.num_data, self.num_latent)) if X_prior_var is None: X_prior_var = np.ones((self.num_data, self.num_latent)) self.X_prior_mean = np.asarray(np.atleast_1d(X_prior_mean), dtype=default_float()) self.X_prior_var = np.asarray(np.atleast_1d(X_prior_var), dtype=default_float()) assert self.X_prior_mean.shape[0] == self.num_data assert self.X_prior_mean.shape[1] == self.num_latent assert self.X_prior_var.shape[0] == self.num_data assert self.X_prior_var.shape[1] == self.num_latent def _build_likelihood(self): """ Construct a tensorflow function to compute the bound on the marginal likelihood. """ pX = DiagonalGaussian(self.X_mean(), self.X_var()) num_inducing = len(self.feature) psi0 = tf.reduce_sum(expectation(pX, self.kern)) psi1 = expectation(pX, (self.kern, self.feature)) psi2 = tf.reduce_sum(expectation(pX, (self.kern, self.feature), (self.kern, self.feature)), axis=0) cov_uu = Kuu(self.feature, self.kern, jitter=default_jitter()) L = tf.linalg.cholesky(cov_uu) sigma2 = self.likelihood.variance sigma = tf.sqrt(sigma2) # Compute intermediate matrices A = tf.linalg.triangular_solve(L, tf.transpose(psi1), lower=True) / sigma tmp = tf.linalg.triangular_solve(L, psi2, lower=True) AAT = tf.linalg.triangular_solve(L, tf.transpose(tmp), lower=True) / sigma2 B = AAT + tf.eye(num_inducing, dtype=default_float()) LB = tf.linalg.cholesky(B) log_det_B = 2. * tf.reduce_sum(tf.math.log(tf.linalg.diag_part(LB))) c = tf.linalg.triangular_solve(LB, tf.linalg.matmul(A, self.Y), lower=True) / sigma # KL[q(x) || p(x)] dX_var = self.X_var() if len(self.X_var().get_shape()) == 2 else tf.linalg.diag_part(self.X_var()) NQ = tf.cast(tf.size(self.X_mean()), default_float()) D = tf.cast(tf.shape(self.Y)[1], default_float()) KL = -0.5 * tf.reduce_sum(tf.math.log(dX_var)) \ + 0.5 * tf.reduce_sum(tf.math.log(self.X_prior_var)) \ - 0.5 * NQ \ + 0.5 * tf.reduce_sum((tf.square(self.X_mean() - self.X_prior_mean) + dX_var) / self.X_prior_var) # compute log marginal bound ND = tf.cast(tf.size(self.Y), default_float()) bound = -0.5 * ND * tf.math.log(2 * np.pi * sigma2) bound += -0.5 * D * log_det_B bound += -0.5 * tf.reduce_sum(tf.square(self.Y)) / sigma2 bound += 0.5 * tf.reduce_sum(tf.square(c)) bound += -0.5 * D * (tf.reduce_sum(psi0) / sigma2 - tf.reduce_sum(tf.linalg.diag_part(AAT))) bound -= KL return bound def _build_predict(self, Xnew, full_cov=False): """ Compute the mean and variance of the latent function at some new points. Note that this is very similar to the SGPR prediction, for which there are notes in the SGPR notebook. :param Xnew: Point to predict at. """ pX = DiagonalGaussian(self.X_mean(), self.X_var()) num_inducing = len(self.feature) psi1 = expectation(pX, (self.kern, self.feature)) psi2 = tf.reduce_sum(expectation(pX, (self.kern, self.feature), (self.kern, self.feature)), axis=0) jitter = default_jitter() Kus = Kuf(self.feature, self.kern, Xnew) sigma2 = self.likelihood.variance sigma = tf.sqrt(sigma2) L = tf.linalg.cholesky(Kuu(self.feature, self.kern, jitter=jitter)) A = tf.linalg.triangular_solve(L, tf.transpose(psi1), lower=True) / sigma tmp = tf.linalg.triangular_solve(L, psi2, lower=True) AAT = tf.linalg.triangular_solve(L, tf.transpose(tmp), lower=True) / sigma2 B = AAT + tf.eye(num_inducing, dtype=default_float()) LB = tf.linalg.cholesky(B) c = tf.linalg.triangular_solve(LB, tf.linalg.matmul(A, self.Y), lower=True) / sigma tmp1 = tf.linalg.triangular_solve(L, Kus, lower=True) tmp2 = tf.linalg.triangular_solve(LB, tmp1, lower=True) mean = tf.linalg.matmul(tmp2, c, transpose_a=True) if full_cov: var = self.kern(Xnew) + tf.linalg.matmul(tmp2, tmp2, transpose_a=True) \ - tf.linalg.matmul(tmp1, tmp1, transpose_a=True) shape = tf.stack([1, 1, tf.shape(self.Y)[1]]) var = tf.tile(tf.expand_dims(var, 2), shape) else: var = self.kern(Xnew) + tf.reduce_sum(tf.square(tmp2), 0) \ - tf.reduce_sum(tf.square(tmp1), 0) shape = tf.stack([1, tf.shape(self.Y)[1]]) var = tf.tile(tf.expand_dims(var, 1), shape) return mean + self.mean_function(Xnew), var def PCA_reduce(X, Q): """ A helpful function for linearly reducing the dimensionality of the data X to Q. :param X: data array of size N (number of points) x D (dimensions) :param Q: Number of latent dimensions, Q < D :return: PCA projection array of size [N, Q]. """ assert Q <= X.shape[1], 'Cannot have more latent dimensions than observed' evals, evecs = np.linalg.eigh(np.cov(X.T)) W = evecs[:, -Q:] return (X - X.mean(0)).dot(W)