Revision f4ac4c5a52603afdd12c258958456bd170df7092 authored by ST John on 24 March 2022, 08:32:27 UTC, committed by ST John on 24 March 2022, 08:32:27 UTC
1 parent cc7ed07
Raw File
linears.py
# Copyright 2017-2020 The GPflow Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Type, Union

import tensorflow as tf

from .. import kernels
from .. import mean_functions as mfn
from ..inducing_variables import InducingPoints
from ..probability_distributions import DiagonalGaussian, Gaussian, MarkovGaussian
from . import dispatch
from .expectations import expectation

NoneType: Type[None] = type(None)


@dispatch.expectation.register(Gaussian, kernels.Linear, NoneType, NoneType, NoneType)
def _expectation_gaussian_linear(
    p: Gaussian, kernel: kernels.Linear, _: None, __: None, ___: None, nghp: None = None
) -> tf.Tensor:
    """
    Compute the expectation:
    <diag(K_{X, X})>_p(X)
        - K_{.,.} :: Linear kernel

    :return: N
    """
    # use only active dimensions
    Xmu, _ = kernel.slice(p.mu, None)
    Xcov = kernel.slice_cov(p.cov)

    return tf.reduce_sum(kernel.variance * (tf.linalg.diag_part(Xcov) + Xmu ** 2), 1)


@dispatch.expectation.register(Gaussian, kernels.Linear, InducingPoints, NoneType, NoneType)
def _expectation_gaussian_linear_inducingpoints(
    p: Gaussian,
    kernel: kernels.Linear,
    inducing_variable: InducingPoints,
    _: None,
    __: None,
    nghp: None = None,
) -> tf.Tensor:
    """
    Compute the expectation:
    <K_{X, Z}>_p(X)
        - K_{.,.} :: Linear kernel

    :return: NxM
    """
    # use only active dimensions
    Z, Xmu = kernel.slice(inducing_variable.Z, p.mu)

    return tf.linalg.matmul(Xmu, Z * kernel.variance, transpose_b=True)


@dispatch.expectation.register(Gaussian, kernels.Linear, InducingPoints, mfn.Identity, NoneType)
def _expectation_gaussian_linear_inducingpoints__identity(
    p: Gaussian,
    kernel: kernels.Linear,
    inducing_variable: InducingPoints,
    mean: mfn.Identity,
    _: None,
    nghp: None = None,
) -> tf.Tensor:
    """
    Compute the expectation:
    expectation[n] = <K_{Z, x_n} x_n^T>_p(x_n)
        - K_{.,.} :: Linear kernel

    :return: NxMxD
    """
    Xmu, Xcov = p.mu, p.cov

    N = tf.shape(Xmu)[0]
    var_Z = kernel.variance * inducing_variable.Z  # MxD
    tiled_Z = tf.tile(tf.expand_dims(var_Z, 0), (N, 1, 1))  # NxMxD
    return tf.linalg.matmul(tiled_Z, Xcov + (Xmu[..., None] * Xmu[:, None, :]))


@dispatch.expectation.register(
    MarkovGaussian, kernels.Linear, InducingPoints, mfn.Identity, NoneType
)
def _expectation_markov_linear_inducingpoints__identity(
    p: MarkovGaussian,
    kernel: kernels.Linear,
    inducing_variable: InducingPoints,
    mean: mfn.Identity,
    _: None,
    nghp: None = None,
) -> tf.Tensor:
    """
    Compute the expectation:
    expectation[n] = <K_{Z, x_n} x_{n+1}^T>_p(x_{n:n+1})
        - K_{.,.} :: Linear kernel
        - p       :: MarkovGaussian distribution (p.cov 2x(N+1)xDxD)

    :return: NxMxD
    """
    Xmu, Xcov = p.mu, p.cov

    N = tf.shape(Xmu)[0] - 1
    var_Z = kernel.variance * inducing_variable.Z  # MxD
    tiled_Z = tf.tile(tf.expand_dims(var_Z, 0), (N, 1, 1))  # NxMxD
    eXX = Xcov[1, :-1] + (Xmu[:-1][..., None] * Xmu[1:][:, None, :])  # NxDxD
    return tf.linalg.matmul(tiled_Z, eXX)


@dispatch.expectation.register(
    (Gaussian, DiagonalGaussian), kernels.Linear, InducingPoints, kernels.Linear, InducingPoints
)
def _expectation_gaussian_linear_inducingpoints__linear_inducingpoints(
    p: Union[Gaussian, DiagonalGaussian],
    kern1: kernels.Linear,
    feat1: InducingPoints,
    kern2: kernels.Linear,
    feat2: InducingPoints,
    nghp: None = None,
) -> tf.Tensor:
    """
    Compute the expectation:
    expectation[n] = <Ka_{Z1, x_n} Kb_{x_n, Z2}>_p(x_n)
        - Ka_{.,.}, Kb_{.,.} :: Linear kernels
    Ka and Kb as well as Z1 and Z2 can differ from each other, but this is supported
    only if the Gaussian p is Diagonal (p.cov NxD) and Ka, Kb have disjoint active_dims
    in which case the joint expectations simplify into a product of expectations

    :return: NxMxM
    """
    if kern1.on_separate_dims(kern2) and isinstance(
        p, DiagonalGaussian
    ):  # no joint expectations required
        eKxz1 = expectation(p, (kern1, feat1))
        eKxz2 = expectation(p, (kern2, feat2))
        return eKxz1[:, :, None] * eKxz2[:, None, :]

    if kern1 != kern2 or feat1 != feat2:
        raise NotImplementedError(
            "The expectation over two kernels has only an "
            "analytical implementation if both kernels are equal."
        )

    kernel = kern1
    inducing_variable = feat1

    # use only active dimensions
    Xcov = kernel.slice_cov(tf.linalg.diag(p.cov) if isinstance(p, DiagonalGaussian) else p.cov)
    Z, Xmu = kernel.slice(inducing_variable.Z, p.mu)

    N = tf.shape(Xmu)[0]
    var_Z = kernel.variance * Z
    tiled_Z = tf.tile(tf.expand_dims(var_Z, 0), (N, 1, 1))  # NxMxD
    XX = Xcov + tf.expand_dims(Xmu, 1) * tf.expand_dims(Xmu, 2)  # NxDxD
    return tf.linalg.matmul(tf.linalg.matmul(tiled_Z, XX), tiled_Z, transpose_b=True)
back to top