https://github.com/GPflow/GPflow
Tip revision: 816a41f081fb5f8d1c33aaf7b7866fcf393b0898 authored by thevincentadam on 02 February 2020, 21:14:05 UTC
update models and parse
update models and parse
Tip revision: 816a41f
svgps.py
import numpy as np
import tensorflow as tf
import abc
from typing import Tuple, TypeVar
from ..config import default_jitter, default_float
from ..mean_functions import Zero
from ..utilities import ops, positive, triangular
from .. import kullback_leiblers
from ..base import Parameter
from ..conditionals import conditional, base_conditional
from .util import inducingpoint_wrapper
from ..utilities.ops import eye
from gpflow.models import BayesianModel
Data = TypeVar('Data', Tuple[tf.Tensor, tf.Tensor], tf.Tensor)
DataPoint = tf.Tensor
MeanAndVariance = Tuple[tf.Tensor, tf.Tensor]
class SVGPs(BayesianModel, metaclass=abc.ABCMeta):
def __init__(self,
kernels,
likelihood,
inducing_variables,
indices,
*,
mean_functions=None,
num_latent: int = 1,
q_diag: bool = False,
q_mus=None,
q_sqrts=None,
whiten: bool = True,
num_data=None,
deterministic_optimisation=False,
num_samples=10,
offsets_x=None,
offsets_y=None):
"""
- kernels, inducing_variables, mean_functions are appropriate
lists of GPflow objects
- likelihood is a GPflow object
- num_latent is the number of latent processes to use, defaults to 1
- q_diag is a boolean. If True, the covariance is approximated by a
diagonal matrix.
- whiten is a boolean. If True, we use the whitened representation of
the inducing points.
- num_data is the total number of observations, defaults to X.shape[0]
(relevant when feeding in external minibatches)
"""
# init the super class, accept args
super().__init__()
self.num_latent = num_latent
self.kernels = kernels
self.num_components = len(kernels)
if mean_functions is None:
mean_functions = [Zero() for _ in range(self.num_components)]
self.mean_functions = mean_functions
self.likelihood = likelihood
self.num_data = num_data
self.q_diag = q_diag
self.whiten = whiten
self.indices = indices
self.inducing_variables = \
[inducingpoint_wrapper(inducing_variable) for inducing_variable in inducing_variables]
# init variational parameters
num_inducings = \
[len(inducing_variable) for inducing_variable in inducing_variables]
self.deterministic_optimisation = deterministic_optimisation
self.num_samples = num_samples
self.num_inducings = num_inducings
self._init_variational_parameters(num_inducings, q_mus, q_sqrts, q_diag)
if offsets_x is None:
offsets_x = [np.zeros((1, 1)) for _ in range(self.num_components)]
self.offsets_x = [Parameter(o) for o in offsets_x]
if offsets_y is None:
offsets_y = [np.zeros((1, 1)) for _ in range(self.num_components)]
self.offsets_y = [Parameter(o) for o in offsets_y]
def _init_variational_parameters(self, num_inducings, q_mus, q_sqrts, q_diag):
self.q_mus, self.q_sqrts = [], []
for c in range(self.num_components):
q_mu = None if q_mus is None else q_mus[c]
q_sqrt = None if q_sqrts is None else q_sqrts[c]
q_mu_, q_sqrt_ = \
self._build_component_variational_parameters(num_inducings[c], q_mu, q_sqrt, q_diag)
self.q_mus.append(q_mu_)
self.q_sqrts.append(q_sqrt_)
def _build_component_variational_parameters(self, num_inducing, q_mu, q_sqrt, q_diag):
"""
Constructs the mean and cholesky of the covariance of the variational Gaussian posterior.
If a user passes values for `q_mu` and `q_sqrt` the routine checks if they have consistent
and correct shapes. If a user does not specify any values for `q_mu` and `q_sqrt`, the routine
initializes them, their shape depends on `num_inducing` and `q_diag`.
Note: most often the comments refer to the number of observations (=output dimensions) with P,
number of latent GPs with L, and number of inducing points M. Typically P equals L,
but when certain multioutput kernels are used, this can change.
Parameters
----------
:param num_inducing: int
Number of inducing variables, typically refered to as M.
:param q_mu: np.array or None
Mean of the variational Gaussian posterior. If None the function will initialise
the mean with zeros. If not None, the shape of `q_mu` is checked.
:param q_sqrt: np.array or None
Cholesky of the covariance of the variational Gaussian posterior.
If None the function will initialise `q_sqrt` with identity matrix.
If not None, the shape of `q_sqrt` is checked, depending on `q_diag`.
:param q_diag: bool
Used to check if `q_mu` and `q_sqrt` have the correct shape or to
construct them with the correct shape. If `q_diag` is true,
`q_sqrt` is two dimensional and only holds the square root of the
covariance diagonal elements. If False, `q_sqrt` is three dimensional.
"""
q_mu = np.zeros((num_inducing, self.num_latent)) if q_mu is None else q_mu
q_mu_ = Parameter(q_mu, dtype=default_float(), name="q_mu") # [M, P]
if q_sqrt is None:
if self.q_diag:
ones = np.ones((num_inducing, self.num_latent), dtype=default_float())
q_sqrt_ = Parameter(ones, transform=positive(), name="q_sqrt") # [M, P]
else:
q_sqrt = [np.eye(num_inducing, dtype=default_float()) for _ in range(self.num_latent)]
q_sqrt = np.array(q_sqrt)
q_sqrt_ = Parameter(q_sqrt, transform=triangular(), name="q_sqrt") # [P, M, M]
else:
if q_diag:
assert q_sqrt.ndim == 2
self.num_latent = q_sqrt.shape[1]
q_sqrt_ = Parameter(q_sqrt, transform=positive(), name="q_sqrt") # [M, L|P]
else:
assert q_sqrt.ndim == 3
self.num_latent = q_sqrt.shape[0]
q_sqrt_ = Parameter(q_sqrt, transform=triangular(), name="q_sqrt") # [L|P, M, M]
return q_mu_, q_sqrt_
def prior_kls(self):
kls = []
q_mus = self.mu_corrections()
for c in range(self.num_components):
kls.append(kullback_leiblers.prior_kl(self.inducing_variables[c],
self.kernels[c],
q_mus[c],
self.q_sqrts[c],
whiten=self.whiten))
return kls
def prior_kl(self):
return tf.reduce_sum(self.prior_kls())
def mu_corrections(self):
mus = []
for c in range(self.num_components):
Kmm = self.kernels[c](self.inducing_variables[c].Z) + \
ops.eye(self.num_inducings[c], value=default_jitter(), dtype=default_float())
Kmn = self.kernels[c](self.inducing_variables[c].Z, self.offsets_x[c])
Knn = self.kernels[c](self.offsets_x[c], full=False)
mu0, var = base_conditional(Kmn, Kmm, Knn, self.q_mus[c],
full_cov=False, white=self.whiten, q_sqrt=self.q_sqrts[c])
n, _ = base_conditional(Kmn, Kmm, Knn, tf.ones_like(self.q_mus[c]),
full_cov=False, white=self.whiten, q_sqrt=self.q_sqrts[c])
mus.append(self.q_mus[c] - ((mu0 - self.offsets_y[c])/ n))
return mus
def log_likelihood(self, data: Tuple[tf.Tensor, tf.Tensor]) -> tf.Tensor:
"""
This gives a variational bound on the model likelihood.
"""
X, Y = data
kl = self.prior_kl()
if self.deterministic_optimisation:
p_mean, p_var = self.predict_predictor(X)
var_exp = self.likelihood.variational_expectations(p_mean, p_var, Y)
else:
p_sample = self.sample_predictor(X, self.num_samples)
var_exp = self.likelihood.log_prob(p_sample, Y) / self.num_samples
return tf.reduce_sum(var_exp) - kl
def elbo(self, data: Tuple[tf.Tensor, tf.Tensor]) -> tf.Tensor:
"""
This returns the evidence lower bound (ELBO) of the log marginal likelihood.
"""
return self.log_marginal_likelihood(data)
def predict_fs(self, Xnew: tf.Tensor, full_cov=False, full_output_cov=False) -> tf.Tensor:
mus, vars = [], []
q_mus = self.mu_corrections()
for c in range(self.num_components):
# hard coding ordering
Xnew_c = Xnew[:, self.indices[c]]
mu, var = conditional(Xnew_c,
self.inducing_variables[c],
self.kernels[c],
q_mus[c],
q_sqrt=self.q_sqrts[c],
full_cov=full_cov,
white=self.whiten,
full_output_cov=full_output_cov)
mu += self.mean_functions[c](Xnew_c)
mus.append(mu)
vars.append(var)
return tf.concat(mus, axis=-1), tf.concat(vars, axis=-1)
def sample_predictor(self, Xnew: tf.Tensor, num_samples=1):
fmeans, fvars = self.predict_fs(Xnew)
eps = tf.random.normal([num_samples] + fmeans.shape, dtype=fmeans.dtype)
fs_sample = eps * tf.sqrt(fvars) + fmeans
return self.predictor(fs_sample)
@abc.abstractmethod
def predictor(self, F: tf.Tensor):
raise NotImplementedError()
def predict_predictor(self, Xnew: tf.Tensor):
p_samp = self.sample_predictor(Xnew, num_samples=self.num_samples)
p_mean = tf.reduce_mean(p_samp, axis=0)
return p_mean, tf.reduce_mean(tf.square(p_mean-p_samp), axis=0)
class AdditiveSVGPs(SVGPs):
def predictor(self, F: tf.Tensor):
return tf.reduce_sum(F, axis=-1, keepdims=True)
def predict_predictor(self, Xnew: tf.Tensor):
f_means, f_vars = self.predict_fs(Xnew, full_cov=False, full_output_cov=False)
p_mean = tf.reduce_sum(f_means, axis=-1, keepdims=True)
p_var = tf.reduce_sum(f_vars, axis=-1, keepdims=True)
return p_mean, p_var
class MultiplicativeSVGPs(SVGPs):
def predictor(self, F: tf.Tensor):
return tf.reduce_prod(F, axis=-1, keepdims=True)
class Custom(SVGPs):
def __init__(self,
kernels,
likelihood,
inducing_variables,
indices,
*,
mean_functions=None,
num_latent: int = 1,
q_diag: bool = False,
q_mus=None,
q_sqrts=None,
whiten: bool = True,
num_data=None,
deterministic_optimisation=False,
num_samples=10,
offsets_x=None,
offsets_y=None):
super().__init__(kernels, likelihood, inducing_variables, indices,
mean_functions=mean_functions,
num_latent=num_latent,
q_diag=q_diag,
q_mus=q_mus,
q_sqrts=q_sqrts,
whiten=whiten,
num_data=num_data,
deterministic_optimisation=deterministic_optimisation,
num_samples=num_samples,
offsets_x=offsets_x,
offsets_y=offsets_y)
def predictor(self, F: tf.Tensor):
return ( F[..., 0] + (F[..., 1] ) * (F[...,2]))[..., None]
class CNY(SVGPs):
def __init__(self,
kernels,
likelihood,
inducing_variables,
*,
mean_functions=None,
num_latent: int = 1,
q_diag: bool = False,
q_mus=None,
q_sqrts=None,
whiten: bool = True,
num_data=None,
new_years=None):
self.new_years = new_years
self.num_years = len(new_years)
num_latent = 1
super().__init__(kernels, likelihood, inducing_variables,
mean_functions=mean_functions,
num_latent=num_latent,
q_diag=q_diag,
q_mus=q_mus,
q_sqrts=q_sqrts,
whiten=whiten,
num_data=num_data)
def log_likelihood(self, data: Tuple[tf.Tensor, tf.Tensor]) -> tf.Tensor:
"""
This gives a variational bound on the model likelihood.
"""
X, Y = data
kl = self.prior_kl()
f_means, f_vars = self.predict_fs(X, full_cov=False, full_output_cov=False)
f_trend_mean = f_means[:, 0:1]
f_trend_var = f_vars[:, 0:1]
var_exp = 0
for ny in range(self.num_years):
X_ = X - self.new_years[ny]
f_means, f_vars = self.predict_fs(X_, full_cov=False, full_output_cov=False)
f_ny_mean = f_means[:, 1:2]
f_ny_var = f_vars[:, 1:2]
# hard coded ny model
f_mean = f_trend_mean + f_ny_mean
f_var = f_trend_var + f_ny_var
var_exp += tf.reduce_sum(
self.likelihood.variational_expectations(f_mean, f_var, Y[:, ny:ny+1])
)
return var_exp - kl
def predict_years(self, Xnew: tf.Tensor, full_cov=False, full_output_cov=False) -> tf.Tensor:
f_means, f_vars = self.predict_fs(Xnew, full_cov=False, full_output_cov=False)
f_trend_mean = f_means[:, 0:1]
f_trend_var = f_vars[:, 0:1]
var_exp = 0
f_means_year = []
f_vars_year = []
for ny in range(self.num_years):
X_ = Xnew - self.new_years[ny]
f_means, f_vars = self.predict_fs(X_, full_cov=False, full_output_cov=False)
f_ny_mean = f_means[:, 1:2]
f_ny_var = f_vars[:, 1:2]
# hard coded ny model
f_mean = f_trend_mean + f_ny_mean
f_var = f_trend_var + f_ny_var
f_vars_year.append(f_var)
f_means_year.append(f_mean)
return tf.concat(f_means_year, axis=-1), tf.concat(f_vars_year, axis=-1)