https://github.com/pymc-devs/pymc3
Raw File
Tip revision: 419af0688353292c0a356cddbb9271737e89a723 authored by Luke Lewis-Borrell on 26 October 2023, 05:24:39 UTC
Support logp derivation of `power(base, rv)` (#6962)
Tip revision: 419af06
simulator.py
#   Copyright 2023 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

import numpy as np
import pytensor
import pytensor.tensor as pt

from pytensor.graph.op import Apply, Op
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.variable import TensorVariable
from scipy.spatial import cKDTree

from pymc.distributions.distribution import Distribution, _moment
from pymc.logprob.abstract import _logprob
from pymc.pytensorf import floatX

__all__ = ["Simulator"]

_log = logging.getLogger(__name__)


class SimulatorRV(RandomVariable):
    """
    Base class for SimulatorRVs

    This should be subclassed when defining custom Simulator objects.
    """

    name = "SimulatorRV"
    ndim_supp = None
    ndims_params = None
    dtype = "floatX"
    _print_name = ("Simulator", "\\operatorname{Simulator}")

    fn = None
    _distance = None
    _sum_stat = None
    epsilon = None

    @classmethod
    def rng_fn(cls, *args, **kwargs):
        return cls.fn(*args, **kwargs)

    @classmethod
    def distance(cls, *args, **kwargs):
        return cls._distance(*args, **kwargs)

    @classmethod
    def sum_stat(cls, *args, **kwargs):
        return cls._sum_stat(*args, **kwargs)


class Simulator(Distribution):
    r"""
    Simulator distribution, used for Approximate Bayesian Inference (ABC)
    with Sequential Monte Carlo (SMC) sampling via :func:`~pymc.sample_smc`.

    Simulator distributions have a stochastic pseudo-loglikelihood defined by
    a distance metric between the observed and simulated data, and tweaked
    by a hyper-parameter ``epsilon``.

    Parameters
    ----------
    fn : callable
        Python random simulator function. Should expect the following signature
        ``(rng, arg1, arg2, ... argn, size)``, where rng is a ``numpy.random.Generator``
        and ``size`` defines the size of the desired sample.
    *unnamed_params : list of TensorVariable
        Parameters used by the Simulator random function. Each parameter can be passed
        by order after fn, for example ``param1, param2, ..., paramN``. params can also
        be passed with keyword argument "params".
    params : list of TensorVariable
        Keyword form of ''unnamed_params''.
        One of unnamed_params or params must be provided.
        If passed both unnamed_params and params, an error is raised.
    distance : PyTensor_Op, callable or str, default "gaussian"
        Distance function. Available options are ``"gaussian"``, ``"laplace"``,
        ``"kullback_leibler"`` or a user defined function (or PyTensor_Op) that takes
        ``epsilon``, the summary statistics of observed_data and the summary statistics
        of simulated_data as input.

        ``gaussian``: :math:`-0.5 \left(\left(\frac{xo - xs}{\epsilon}\right)^2\right)`

        ``laplace``: :math:`-{\left(\frac{|xo - xs|}{\epsilon}\right)}`

        ``kullback_leibler``: :math:`\frac{d}{n} \frac{1}{\epsilon} \sum_i^n -\log \left( \frac{{\nu_d}_i}{{\rho_d}_i} \right) + \log_r` [1]_

        ``distance="gaussian"`` + ``sum_stat="sort"`` is equivalent to the 1D 2-wasserstein distance

        ``distance="laplace"`` + ``sum_stat="sort"`` is equivalent to the the 1D 1-wasserstein distance
    sum_stat : PyTensor_Op, callable or str, default "identity"
        Summary statistic function. Available options are ``"identity"``,
        ``"sort"``, ``"mean"``, ``"median"``. If a callable (or PyTensor_Op) is defined,
        it should return a 1d numpy array (or PyTensor vector).
    epsilon : tensor_like of float, default 1.0
        Scaling parameter for the distance functions. It should be a float or
        an array of the same size of the output of ``sum_stat``.
    ndim_supp : int, default 0
        Number of dimensions of the SimulatorRV (0 for scalar, 1 for vector, etc.)
    ndims_params : list of int, optional
        Number of minimum dimensions of each parameter of the RV. For example,
        if the Simulator accepts two scalar inputs, it should be ``[0, 0]``.
        Default to list of 0 with length equal to the number of parameters.
    class_name : str, optional
        Suffix name for the RandomVariable class which will wrap the Simulator methods.

    Examples
    --------
    .. code-block:: python

        def simulator_fn(rng, loc, scale, size):
            return rng.normal(loc, scale, size=size)

        with pm.Model() as m:
            loc = pm.Normal("loc", 0, 1)
            scale = pm.HalfNormal("scale", 1)
            simulator = pm.Simulator("simulator", simulator_fn, loc, scale, observed=data)
            idata = pm.sample_smc()

    References
    ----------
    .. [1] PĂ©rez-Cruz, F. (2008, July). Kullback-Leibler divergence
        estimation of continuous distributions. In 2008 IEEE international
        symposium on information theory (pp. 1666-1670). IEEE.
        `link <https://ieeexplore.ieee.org/document/4595271>`__

    """

    rv_type = SimulatorRV

    def __new__(cls, name, *args, **kwargs):
        kwargs.setdefault("class_name", f"Simulator_{name}")
        return super().__new__(cls, name, *args, **kwargs)

    @classmethod
    def dist(  # type: ignore
        cls,
        fn,
        *unnamed_params,
        params=None,
        distance="gaussian",
        sum_stat="identity",
        epsilon=1,
        ndim_supp=0,
        ndims_params=None,
        dtype="floatX",
        class_name: str = "Simulator",
        **kwargs,
    ):
        if not isinstance(distance, Op):
            if distance == "gaussian":
                distance = gaussian
            elif distance == "laplace":
                distance = laplace
            elif distance == "kullback_leibler":
                raise NotImplementedError("KL not refactored yet")
                # TODO: Wrap KL in pytensor OP
                # distance = KullbackLeibler(observed)
                # if sum_stat != "identity":
                #     _log.info(f"Automatically setting sum_stat to identity as expected by {distance}")
                #     sum_stat = "identity"
            elif callable(distance):
                distance = create_distance_op_from_fn(distance)
            else:
                raise ValueError(f"The distance metric {distance} is not implemented")

        if not isinstance(sum_stat, Op):
            if sum_stat == "identity":
                sum_stat = identity
            elif sum_stat == "sort":
                sum_stat = pt.sort
            elif sum_stat == "mean":
                sum_stat = pt.mean
            elif sum_stat == "median":
                # Missing in PyTensor, see pytensor/issues/525
                sum_stat = create_sum_stat_op_from_fn(np.median)
            elif callable(sum_stat):
                sum_stat = create_sum_stat_op_from_fn(sum_stat)
            else:
                raise ValueError(f"The summary statistic {sum_stat} is not implemented")

        epsilon = pt.as_tensor_variable(floatX(epsilon))

        if params is None:
            params = unnamed_params
        else:
            if unnamed_params:
                raise ValueError("Cannot pass both unnamed parameters and `params`")

        # Assume scalar ndims_params
        if ndims_params is None:
            ndims_params = [0] * len(params)

        return super().dist(
            params,
            fn=fn,
            ndim_supp=ndim_supp,
            ndims_params=ndims_params,
            dtype=dtype,
            distance=distance,
            sum_stat=sum_stat,
            epsilon=epsilon,
            class_name=class_name,
            **kwargs,
        )

    @classmethod
    def rv_op(
        cls,
        *params,
        fn,
        ndim_supp,
        ndims_params,
        dtype,
        distance,
        sum_stat,
        epsilon,
        class_name,
        **kwargs,
    ):
        sim_op = type(
            class_name,
            (SimulatorRV,),
            dict(
                name=class_name,
                ndim_supp=ndim_supp,
                ndims_params=ndims_params,
                dtype=dtype,
                inplace=False,
                fn=fn,
                _distance=distance,
                _sum_stat=sum_stat,
                epsilon=epsilon,
            ),
        )()
        return sim_op(*params, **kwargs)


@_moment.register(SimulatorRV)  # type: ignore
def simulator_moment(op, rv, *inputs):
    sim_inputs = inputs[3:]
    # Take the mean of 10 draws
    multiple_sim = rv.owner.op(*sim_inputs, size=pt.concatenate([[10], rv.shape]))
    return pt.mean(multiple_sim, axis=0)


@_logprob.register(SimulatorRV)
def simulator_logp(op, values, *inputs, **kwargs):
    (value,) = values

    # Use a new rng to avoid non-randomness in parallel sampling
    # TODO: Model rngs should be updated prior to multiprocessing split,
    #  in which case this would not be needed. However, that would have to be
    #  done for every sampler that may accommodate Simulators
    rng = pytensor.shared(np.random.default_rng(), name="simulator_rng")
    # Create a new simulatorRV with identical inputs as the original one
    sim_value = op.make_node(rng, *inputs[1:]).default_output()
    sim_value.name = "simulator_value"

    return op.distance(
        op.epsilon,
        op.sum_stat(value),
        op.sum_stat(sim_value),
    )


def identity(x):
    """Identity function, used as a summary statistics."""
    return x


def gaussian(epsilon, obs_data, sim_data):
    """Gaussian kernel."""
    return -0.5 * ((obs_data - sim_data) / epsilon) ** 2


def laplace(epsilon, obs_data, sim_data):
    """Laplace kernel."""
    return -pt.abs((obs_data - sim_data) / epsilon)


class KullbackLeibler:
    """Approximate Kullback-Leibler."""

    def __init__(self, obs_data):
        if obs_data.ndim == 1:
            obs_data = obs_data[:, None]
        n, d = obs_data.shape
        rho_d, _ = cKDTree(obs_data).query(obs_data, 2)
        self.rho_d = rho_d[:, 1]
        self.d_n = d / n
        self.log_r = np.log(n / (n - 1))
        self.obs_data = obs_data

    def __call__(self, epsilon, obs_data, sim_data):
        if sim_data.ndim == 1:
            sim_data = sim_data[:, None]
        nu_d, _ = cKDTree(sim_data).query(self.obs_data, 1)
        return self.d_n * np.sum(-np.log(nu_d / self.rho_d) / epsilon) + self.log_r


def create_sum_stat_op_from_fn(fn):
    vectorX = pt.dvector if pytensor.config.floatX == "float64" else pt.fvector

    # Check if callable returns TensorVariable with dummy inputs
    try:
        res = fn(vectorX())
        if isinstance(res, TensorVariable):
            return fn
    except Exception:
        pass

    # Otherwise, automatically wrap in PyTensor Op
    class SumStat(Op):
        def make_node(self, x):
            x = pt.as_tensor_variable(x)
            return Apply(self, [x], [vectorX()])

        def perform(self, node, inputs, outputs):
            (x,) = inputs
            outputs[0][0] = np.atleast_1d(fn(x)).astype(pytensor.config.floatX)

    return SumStat()


def create_distance_op_from_fn(fn):
    scalarX = pt.dscalar if pytensor.config.floatX == "float64" else pt.fscalar
    vectorX = pt.dvector if pytensor.config.floatX == "float64" else pt.fvector

    # Check if callable returns TensorVariable with dummy inputs
    try:
        res = fn(scalarX(), vectorX(), vectorX())
        if isinstance(res, TensorVariable):
            return fn
    except Exception:
        pass

    # Otherwise, automatically wrap in PyTensor Op
    class Distance(Op):
        def make_node(self, epsilon, obs_data, sim_data):
            epsilon = pt.as_tensor_variable(epsilon)
            obs_data = pt.as_tensor_variable(obs_data)
            sim_data = pt.as_tensor_variable(sim_data)
            return Apply(self, [epsilon, obs_data, sim_data], [vectorX()])

        def perform(self, node, inputs, outputs):
            eps, obs_data, sim_data = inputs
            outputs[0][0] = np.atleast_1d(fn(eps, obs_data, sim_data)).astype(
                pytensor.config.floatX
            )

    return Distance()
back to top