Raw File
# Copyright 2017-2020 The GPflow Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import warnings
from functools import wraps
from typing import Callable, Iterable, List, Optional, Tuple, Union

import numpy as np
import tensorflow as tf

from ..base import AnyNDArray, TensorType
from ..config import default_float
from ..utilities import to_default_float
from .gauss_hermite import NDiagGHQuadrature


def hermgauss(n: int) -> Tuple[AnyNDArray, AnyNDArray]:
    x, w = np.polynomial.hermite.hermgauss(n)
    x, w = x.astype(default_float()), w.astype(default_float())
    return x, w


def mvhermgauss(H: int, D: int) -> Tuple[AnyNDArray, AnyNDArray]:
    """
    Return the evaluation locations 'xn', and weights 'wn' for a multivariate
    Gauss-Hermite quadrature.

    The outputs can be used to approximate the following type of integral:
    int exp(-x)*f(x) dx ~ sum_i w[i,:]*f(x[i,:])

    :param H: Number of Gauss-Hermite evaluation points.
    :param D: Number of input dimensions. Needs to be known at call-time.
    :return: eval_locations 'x' (H**DxD), weights 'w' (H**D)
    """
    gh_x, gh_w = hermgauss(H)
    x: AnyNDArray = np.array(list(itertools.product(*(gh_x,) * D)))  # H**DxD
    w = np.prod(np.array(list(itertools.product(*(gh_w,) * D))), 1)  # H**D
    return x, w


def mvnquad(
    func: Callable[[tf.Tensor], tf.Tensor],
    means: TensorType,
    covs: TensorType,
    H: int,
    Din: Optional[int] = None,
    Dout: Optional[Tuple[int, ...]] = None,
) -> tf.Tensor:
    """
    Computes N Gaussian expectation integrals of a single function 'f'
    using Gauss-Hermite quadrature.
    :param f: integrand function. Takes one input of shape ?xD.
    :param means: NxD
    :param covs: NxDxD
    :param H: Number of Gauss-Hermite evaluation points.
    :param Din: Number of input dimensions. Needs to be known at call-time.
    :param Dout: Number of output dimensions. Defaults to (). Dout is assumed
    to leave out the item index, i.e. f actually maps (?xD)->(?x*Dout).
    :return: quadratures (N,*Dout)
    """
    # Figure out input shape information
    if Din is None:
        Din = means.shape[1]

    if Din is None:
        raise ValueError(
            "If `Din` is passed as `None`, `means` must have a known shape. "
            "Running mvnquad in `autoflow` without specifying `Din` and `Dout` "
            "is problematic. Consider using your own session."
        )  # pragma: no cover

    xn, wn = mvhermgauss(H, Din)
    N = means.shape[0]

    # transform points based on Gaussian parameters
    cholXcov = tf.linalg.cholesky(covs)  # NxDxD
    Xt = tf.linalg.matmul(
        cholXcov, tf.tile(xn[None, :, :], (N, 1, 1)), transpose_b=True
    )  # NxDxH**D
    X = 2.0 ** 0.5 * Xt + tf.expand_dims(means, 2)  # NxDxH**D
    Xr = tf.reshape(tf.transpose(X, [2, 0, 1]), (-1, Din))  # (H**D*N)xD

    # perform quadrature
    fevals = func(Xr)
    if Dout is None:
        Dout = tuple((d if type(d) is int else d.value) for d in fevals.shape[1:])

    if any([d is None for d in Dout]):
        raise ValueError(
            "If `Dout` is passed as `None`, the output of `func` must have known "
            "shape. Running mvnquad in `autoflow` without specifying `Din` and `Dout` "
            "is problematic. Consider using your own session."
        )  # pragma: no cover
    fX = tf.reshape(
        fevals,
        (
            H ** Din,
            N,
        )
        + Dout,
    )
    wr = np.reshape(wn * np.pi ** (-Din * 0.5), (-1,) + (1,) * (1 + len(Dout)))
    return tf.reduce_sum(fX * wr, 0)


def ndiagquad(
    funcs: Union[Callable[..., tf.Tensor], Iterable[Callable[..., tf.Tensor]]],
    H: int,
    Fmu: Union[TensorType, Tuple[TensorType, ...], List[TensorType]],
    Fvar: Union[TensorType, Tuple[TensorType, ...], List[TensorType]],
    logspace: bool = False,
    **Ys: TensorType,
) -> tf.Tensor:
    """
    Computes N Gaussian expectation integrals of one or more functions
    using Gauss-Hermite quadrature. The Gaussians must be independent.

    The means and variances of the Gaussians are specified by Fmu and Fvar.
    The N-integrals are assumed to be taken wrt the last dimensions of Fmu, Fvar.

    :param funcs: the integrand(s):
        Callable or Iterable of Callables that operates elementwise
    :param H: number of Gauss-Hermite quadrature points
    :param Fmu: array/tensor or `Din`-tuple/list thereof
    :param Fvar: array/tensor or `Din`-tuple/list thereof
    :param logspace: if True, funcs are the log-integrands and this calculates
        the log-expectation of exp(funcs)
    :param **Ys: arrays/tensors; deterministic arguments to be passed by name

    Fmu, Fvar, Ys should all have same shape, with overall size `N`
    :return: shape is the same as that of the first Fmu
    """
    warnings.warn(
        "Please use gpflow.quadrature.NDiagGHQuadrature instead "
        "(note the changed convention of how multi-dimensional quadrature is handled)",
        DeprecationWarning,
    )

    n_gh = H
    if isinstance(Fmu, (tuple, list)):
        dim = len(Fmu)
        shape = tf.shape(Fmu[0])
        Fmu = tf.stack(Fmu, axis=-1)
        Fvar = tf.stack(Fvar, axis=-1)
    else:
        dim = 1
        shape = tf.shape(Fmu)

    Fmu = tf.reshape(Fmu, (-1, dim))
    Fvar = tf.reshape(Fvar, (-1, dim))

    Ys = {Yname: tf.reshape(Y, (-1, 1)) for Yname, Y in Ys.items()}

    def wrapper(old_fun: Callable[..., tf.Tensor]) -> Callable[..., tf.Tensor]:
        @wraps(old_fun)
        def new_fun(X: TensorType, **Ys: TensorType) -> tf.Tensor:
            Xs = tf.unstack(tf.expand_dims(X, axis=-2), axis=-1)
            fun_eval = old_fun(*Xs, **Ys)
            return tf.cond(
                pred=tf.less(tf.rank(fun_eval), tf.rank(X)),
                true_fn=lambda: fun_eval[..., tf.newaxis],
                false_fn=lambda: fun_eval,
            )

        return new_fun

    if isinstance(funcs, Iterable):
        funcs = [wrapper(f) for f in funcs]
    else:
        funcs = wrapper(funcs)

    quadrature = NDiagGHQuadrature(dim, n_gh)
    if logspace:
        result = quadrature.logspace(funcs, Fmu, Fvar, **Ys)
    else:
        result = quadrature(funcs, Fmu, Fvar, **Ys)

    if isinstance(result, list):
        result = [tf.reshape(r, shape) for r in result]
    else:
        result = tf.reshape(result, shape)

    return result


def ndiag_mc(
    funcs: Union[Callable[..., tf.Tensor], Iterable[Callable[..., tf.Tensor]]],
    S: int,
    Fmu: TensorType,
    Fvar: TensorType,
    logspace: bool = False,
    epsilon: Optional[TensorType] = None,
    **Ys: TensorType,
) -> tf.Tensor:
    """
    Computes N Gaussian expectation integrals of one or more functions
    using Monte Carlo samples. The Gaussians must be independent.

    :param funcs: the integrand(s):
        Callable or Iterable of Callables that operates elementwise
    :param S: number of Monte Carlo sampling points
    :param Fmu: array/tensor
    :param Fvar: array/tensor
    :param logspace: if True, funcs are the log-integrands and this calculates
        the log-expectation of exp(funcs)
    :param **Ys: arrays/tensors; deterministic arguments to be passed by name

    Fmu, Fvar, Ys should all have same shape, with overall size `N`
    :return: shape is the same as that of the first Fmu
    """
    N, D = tf.shape(Fmu)[0], tf.shape(Fvar)[1]

    if epsilon is None:
        epsilon = tf.random.normal(shape=[S, N, D], dtype=default_float())

    mc_x = Fmu[None, :, :] + tf.sqrt(Fvar[None, :, :]) * epsilon
    mc_Xr = tf.reshape(mc_x, (S * N, D))

    for name, Y in Ys.items():
        D_out = Y.shape[1]
        # we can't rely on broadcasting and need tiling
        mc_Yr = tf.tile(Y[None, ...], [S, 1, 1])  # [S, N, D]_out
        Ys[name] = tf.reshape(mc_Yr, (S * N, D_out))  # S * [N, _]out

    def eval_func(func: Callable[..., tf.Tensor]) -> tf.Tensor:
        feval = func(mc_Xr, **Ys)
        feval = tf.reshape(feval, (S, N, -1))
        if logspace:
            log_S = tf.math.log(to_default_float(S))
            return tf.reduce_logsumexp(feval, axis=0) - log_S  # [N, D]
        else:
            return tf.reduce_mean(feval, axis=0)

    if isinstance(funcs, Iterable):
        return [eval_func(f) for f in funcs]
    else:
        return eval_func(funcs)
back to top