Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision 15b6abe1dc75373a5e9bb944d331f209a67a88d2 authored by Martin Skrodzki on 18 December 2023, 16:03:43 UTC, committed by Martin Skrodzki on 18 December 2023, 16:03:43 UTC
Revert to file convention as present on SurfDrive.
1 parent 073e059
  • Files
  • Changes
  • 1882b1c
  • /
  • hyperbolicTSNE
  • /
  • cost_functions_.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • directory
  • content
revision badge
swh:1:rev:15b6abe1dc75373a5e9bb944d331f209a67a88d2
directory badge Iframe embedding
swh:1:dir:0e4f27c86145651734e2eafd6aa66a5d9d28273e
content badge Iframe embedding
swh:1:cnt:750fa03357a9b5717e6aaf6b08f7bac77a0c8e98

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • revision
  • directory
  • content
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
cost_functions_.py
""" Implementation of Hyperbolic Kullback-Leibler Divergence

The cost function class has two main methods:
 - obj:     Objective function giving the cost function's value.
 - grad:    Gradient of the cost function.
"""

import ctypes

import numpy as np

from sklearn.utils._openmp_helpers import _openmp_effective_n_threads

from hyperbolicTSNE.hyperbolic_barnes_hut.tsne import gradient

MACHINE_EPSILON = np.finfo(np.double).eps


def check_params(params):
    """Checks params dict includes supported key-values.
    Raises an exception if this is not the case.

    Parameters
    ----------
    params : _type_
        Cost function params in key-value format.
    """
    if "method" not in params or "params" not in params:
        raise ValueError("`other_params` should include a method string `method`, its params `params`.")
    if not isinstance(params["method"], str):
        raise TypeError("`method` of cost function should be a string.")
    if params["params"] is not None and not isinstance(params["params"], dict):
        raise TypeError("`params` should be either None or a dict with the appropriate setup parameters")

    general_params = ["num_threads", "verbose", "degrees_of_freedom", "calc_both", "area_split", "grad_fix"]
    if params["method"] == "exact":
        all_params = general_params + ["skip_num_points"]
    elif params["method"] == "barnes-hut":
        all_params = general_params + ["angle"]
    else:
        raise ValueError("HyperbolicKL method is not a valid one (available methods are `exact` and `barnes-hut`)")

    for p in params["params"]:
        if p not in all_params:
            raise ValueError(f"{p} is not in the param set of the `{params['method']}` version of HyperbolicKL.")
    for p in all_params:
        if p not in params["params"]:
            raise ValueError(
                f"{p} params is necessary for the `{params['method']}` version of HyperbolicKL. Please set a value or "
                f"use a preset."
            )


class HyperbolicKL:
    """
    Hyperbolic Kullback-Leibler Divergence cost function.

    Parameters
    ----------
    n_components : int, optional (default: 2)
        Dimension of the embedded space.
    other_params : dict
        Cost function params in key-value format.
    """
    def __init__(self, *, n_components, other_params=None):
        if other_params is None:
            raise ValueError(
                "No `other_params` specified for HyperbolicKL, please add your params or select one of the presets."
            )
        self.n_components = n_components
        self.params = other_params

        self.results = []

    @property
    def params(self):
        return self._params

    @params.setter
    def params(self, other_params):
        try:
            default_params = self._params
        except AttributeError:
            default_params = {}
        if not isinstance(other_params, dict):
            raise Exception("`other_params` should be a dict ... initializing with default params")
        else:
            for k, v in other_params.items():
                default_params[k] = v
            check_params(default_params)
            self._params = default_params

    #######################
    # Parameter templates #
    #######################

    @classmethod
    def exact_tsne(cls):
        """Parameter preset for the exact Hyperbolic tSNE cost function.

        Returns
        -------
        dict
            Cost function params in key-value format.
        """
        return {
            "method": "exact",
            "params": {
                "degrees_of_freedom": 1,
                "skip_num_points": 0,
                "num_threads": _openmp_effective_n_threads(),
                "verbose": False
            }
        }

    @classmethod
    def bh_tsne(cls, angle=0.5):
        """Parameter preset for the accelerated Hyperbolic tSNE cost function.

        Parameters
        ----------
        angle : float, optional
            Degree of the approximation, by default 0.5

        Returns
        -------
        dict
            Cost function params in key-value format.
        """
        return {
            "method": "barnes-hut",
            "params": {"angle": angle, "degrees_of_freedom": 1, "num_threads": _openmp_effective_n_threads(),
                       "verbose": False}
        }

    #########################
    # User-facing functions #
    #########################

    def obj(self, Y, *, V):
        """Calculates the Hyperbolic KL Divergence of a given embedding.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).

        Returns
        -------
        float
            KL Divergence value.
        """
        n_samples = V.shape[0]
        if self.params["method"] == "exact":
            raise NotImplementedError("Exact obj not implemented. Use obj_grad to get exact cost function value.")
        elif self.params["method"] == "barnes-hut":
            obj, _ = self._obj_bh(Y, V, n_samples)
            return obj

    def grad(self, Y, *, V):
        """Calculates the gradient of the Hyperbolic KL Divergence of 
        a given embedding.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).

        Returns
        -------
        ndarray
            Array (n_samples x n_components) with KL Divergence gradient values.
        """
        n_samples = V.shape[0]
        if self.params["method"] == "exact":
            return self._grad_exact(Y, V, n_samples)
        elif self.params["method"] == "barnes-hut":
            _, grad = self._grad_bh(Y, V, n_samples)
            return grad

    def obj_grad(self, Y, *, V):
        """Calculates the Hyperbolic KL Divergence and its gradient 
        of a given embedding.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).

        Returns
        -------
        float
            KL Divergence value.
        ndarray
            Array (n_samples x n_components) with KL Divergence gradient values.
        """
        n_samples = V.shape[0]
        if self.params["method"] == "exact":
            obj, grad = self._grad_exact(Y, V, n_samples)
            return obj, grad
        elif self.params["method"] == "barnes-hut":
            obj, grad = self._obj_bh(Y, V, n_samples)
            return obj, grad

    ##########################
    # Main private functions #
    ##########################

    def _obj_exact(self, Y, V, n_samples):
        """Exact computation of the KL Divergence.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).
        n_samples : _type_
            Number of samples in the embedding.
        """
        pass

    def _grad_exact(self, Y, V, n_samples, save_timings=True):
        """Exact computation of the KL Divergence gradient.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).
        n_samples : _type_
            Number of samples in the embedding.
        save_timings : bool, optional
            If True, saves per iteration times, by default True.

        Returns
        -------
        ndarray
            Array (n_samples x n_components) with KL Divergence gradient values.
        """
        Y = Y.astype(ctypes.c_double, copy=False)
        Y = Y.reshape(n_samples, self.n_components)

        val_V = V.data
        neighbors = V.indices.astype(np.int64, copy=False)
        indptr = V.indptr.astype(np.int64, copy=False)

        grad = np.zeros(Y.shape, dtype=ctypes.c_double)
        timings = np.zeros(4, dtype=ctypes.c_float)
        error = gradient(
            timings,
            val_V, Y, neighbors, indptr, grad,
            0.5,
            self.n_components,
            self.params["params"]["verbose"],
            dof=self.params["params"]["degrees_of_freedom"],
            compute_error=True,
            num_threads=self.params["params"]["num_threads"],
            exact=True,
            grad_fix=self.params["params"]["grad_fix"]
        )

        grad = grad.ravel()
        grad *= 4

        if save_timings:
            self.results.append(timings)

        return error, grad

    def _obj_bh(self, Y, V, n_samples):
        """Approximate computation of the KL Divergence.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).
        n_samples : _type_
            Number of samples in the embedding.

        Returns
        -------
        float
            KL Divergence value.
        ndarray
            Array (n_samples x n_components) with KL Divergence gradient values.
        """
        return self._grad_bh(Y, V, n_samples)

    def _grad_bh(self, Y, V, n_samples, save_timings=True):
        """Approximate computation of the KL Divergence gradient.

        Parameters
        ----------
        Y : ndarray
            Flattened low dimensional embedding of length: n_samples x n_components.
        V : ndarray
            High-dimensional affinity matrix (P matrix in tSNE).
        n_samples : _type_
            Number of samples in the embedding.
        save_timings : bool, optional
            If True, saves per iteration times, by default True.

        Returns
        -------
        ndarray
            Array (n_samples x n_components) with KL Divergence gradient values.
        """
        Y = Y.astype(ctypes.c_double, copy=False)
        Y = Y.reshape(n_samples, self.n_components)

        val_V = V.data
        neighbors = V.indices.astype(np.int64, copy=False)
        indptr = V.indptr.astype(np.int64, copy=False)

        grad = np.zeros(Y.shape, dtype=ctypes.c_double)
        timings = np.zeros(4, dtype=ctypes.c_float)
        error = gradient(
            timings,
            val_V, Y, neighbors, indptr, grad,
            self.params["params"]["angle"],
            self.n_components,
            self.params["params"]["verbose"],
            dof=self.params["params"]["degrees_of_freedom"],
            compute_error=True,
            num_threads=self.params["params"]["num_threads"],
            exact=False,
            area_split=self.params["params"]["area_split"],
            grad_fix=self.params["params"]["grad_fix"]
        )

        grad = grad.ravel()
        grad *= 4

        if save_timings:
            self.results.append(timings)

        return error, grad
The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API