Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/msmathcomp/hyperbolic-tsne
03 May 2024, 17:35:23 UTC
  • Code
  • Branches (2)
  • Releases (0)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/main
    • refs/heads/related_works
    No releases to show
  • 477dbb2
  • /
  • hyperbolicTSNE
  • /
  • optimizer_.py
Raw File Download
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge Iframe embedding
swh:1:cnt:182e8969ca3964c342038978c84168867c0434ce
origin badgedirectory badge Iframe embedding
swh:1:dir:649e6af77a57c87647798bf148756724cab0ab03
origin badgerevision badge
swh:1:rev:bba9d0f089659fb170c7270aa90c796f91bfb2b1
origin badgesnapshot badge
swh:1:snp:9ac5f55368e393bdd437f6b15aab9fdb4f438510
Citations

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: bba9d0f089659fb170c7270aa90c796f91bfb2b1 authored by Martin Skrodzki on 02 May 2024, 12:34:19 UTC
Update README.md
Tip revision: bba9d0f
optimizer_.py
""" Collection of Optimizers for high dimensional data embedding optimization

This file collects several classes, each representing an optimizer to be used in the context of high dimensional
data embedding optimization. Each class is derived from the BaseOptimizer class and therefore has to implement the
following methods:
 - __init__ # Constructs this Optimizer with some initial embedding and a similarity matrix
 - available_params # Pretty print of the parameters this optimizer expects
 - check_params # Checks a given dict whether all necessary parameters are present
 - run # Computes the embedding Y according to the set similarity matrix
"""

import numpy as np
from time import time

from .solver_ import gradient_descent
from .cost_functions_ import HyperbolicKL


def check_params(params):
    # Cost function params
    # 1. Check that all necessary params are available
    if "cf" not in params or "cf_config_params" not in params or "cf_params" not in params:
        raise Exception(
            "`other_params` should include a BaseCostFunction object `cf`, its config parameters "
            "`cf_config_params` and its runtime params `cf_params`")
    # 2. cf_config_params and cf_params should be either None or dict
    if params["cf_config_params"] is not None and type(params["cf_config_params"]) is not dict:
        raise Exception("`cf_config_params` should be either None or a dict with the appropriate setup parameters")
    if params["cf_params"] is not None and type(params["cf_params"]) is not dict:
        raise Exception("`cf_params` should be either None or a dict with the appropriate runtime parameters")

    # Global iteration counter params
    if "solver_its_done" not in params:
        params["solver_its_done"] = 0
    else:
        if type(params["solver_its_done"]) != int:
            raise Exception("solver_its should be an integer equal or larger than 0 (if starting from iteration "
                            "0th)")
        else:
            if params["solver_its_done"] < 0:
                raise Exception("solver_its should be an integer equal or larger than 0 (if starting from "
                                "iteration 0th), indicating the solver's starting counting point")

    # Sequence params
    if "sequence" not in params:
        raise Exception("Missing the `sequence` parameter which is an array of solvers and processors. Please "
                        "check the presets for more information.")
    if len(params["sequence"]) == 0:
        print("Warning: there are no blocks in the sequence, therefore the optimizer will not have an effect on "
                "the embedding,")
    for e in params["sequence"]:
        if "type" not in e or "function" not in e or "params" not in e:
            raise Exception("Each block of the sequence must be a dict with the `type`, `function` and `params` "
                            "entries. Please check the presets for more information.")
        if e["type"] not in ["processor", "solver"]:
            raise Exception("`type` of sequence's block in SequentialOptimizer was not recognized.")


class SequentialOptimizer:

    def __init__(self, *, Y0, V, n_components, other_params=None, verbose=0):
        """
        Constructor of a sequential optimizer which executes several blocks sequentially.
        """
        if other_params is None:
            raise Exception("No `other_params` specified for SequentialOptimizer, please add your params or select "
                            "one of the presets.")
        
        self._y0 = Y0
        self.Y = Y0
        self.V = V
        self.n_samples = self.V.shape[0]
        self.n_components = n_components
        self._params = None  # Reserves a field for params
        self.params = other_params  # Calls the custom setter to fill the field
        self.verbose = verbose
        self.cf_val = np.inf

        self.cf = self.params["cf"](n_components=self.n_components,
                                    other_params=self.params["cf_config_params"])

    @property
    def Y(self):
        return self._y

    @Y.setter
    def Y(self, mat):
        try:
            self._y = mat.reshape(-1, self.n_components)
        except AttributeError:
            self._y = mat

    @property
    def V(self):
        return self._V

    @V.setter
    def V(self, mat):
        self._V = mat

    @property
    def params(self):
        return self._params

    @params.setter
    def params(self,  other_params):
        if self._params is not dict:
            default_params = {}
        else:
            default_params = self._params
        if type(other_params) is not dict:
            raise Exception("`other_params` should be a dict ... initializing with default params")
        else:
            for k, v in other_params.items():
                default_params[k] = v
            check_params(default_params)
            self._params = default_params

    def run(self):
        """ Method that implements the optimization process of this particular optimizer. Inputs are the low-dimensional
        points and the high-dimensional similarity matrix. Output is the time that it took to run the whole loop.
        Attributes in class contain latest values.
        Parameters
        ----------
        Returns
        -------
        Y : numpy array
            Embedding of the high-dimensional data.
        cf_val : float
            Cost function value of the embedding.
        time : float
            Time taken to compute the embedding.
        its_done : int
            Number of iterations performed by the optimizer's solvers.
        """
        # Start: logging
        logging_dict = self.params.get("logging_dict", None)
        if logging_dict is not None:
            logging_dict["log_arrays"] = logging_dict.get("log_arrays", False)
            if logging_dict["log_arrays"]:
                logging_dict["log_arrays_ids"] = logging_dict.get("log_arrays_ids", None)
        solver_counter = 0
        # end: logging
        start = time()
        for se in self.params["sequence"]:
            if se["type"] == "processor":
                self.Y, self.V = se["function"](self.Y, self.V, self.cf, **se["params"])
            elif se["type"] == "solver":
                se["params"]["start_it"] = self.params["solver_its_done"]
                se["params"]["verbose"] = self.verbose
                # Start: logging
                self.Y, self.cf_val, its = se["function"](
                    self.Y, self.cf, dict({"V": self.V}, **self.params["cf_params"]), logging_key="sequential_opt_" + str(solver_counter), logging_dict=logging_dict, **se["params"]
                )
                solver_counter += 1
                # End: logging
                self.params["solver_its_done"] = se["params"]["start_it"] + its
        end = time()

        return self.Y, self.cf_val, end - start, self.params["solver_its_done"]

    # Processors: receive (Y, V, cf, **params) and output Y, V
    @staticmethod
    def list_available_processors():
        # Pretty prints the available processors in this optimizer
        print("# SequentialOptimizer processors")
        print("================================")
        print("All processors receive (Y, V, cf, **params) and output Y, V.")
        print("Available are:")
        print(" - `exaggerate_matrix`: Multiplies the V matrix with a given `exaggeration` parameter.")
        print("- `scale_to_optimal_size`: Performs a binary search for an optimal scale parameter, with search depth "
              "`binary_search_depth` to find the best scale for the given Y and cost function.")
        print("==============================================================")

    # noinspection PyUnusedLocal
    @staticmethod
    def exaggerate_matrix(Y, V, cost_function, exaggeration=1.0):
        """ Multiply the given `V` matrix with the given `exaggeration` scalar and return the result.
        Parameters
        ----------
        Y : numpy array
            Embedding of the high-dimensional data set
        V : csr_matrix
            Probability distribution of the data in high-dimensional space
        cost_function : instance of a BaseCostFunction
            Cost function object instance that can compute the objective and gradient
        exaggeration : float, optional (default=1.0)
            Value to multiply the `V` matrix with.
        Returns
        -------
        Y, V : numpy array, csr_matrix
            Unchanged Y and result of `V * exaggeration`
        """
        return Y, V * exaggeration

    # Sequence param presets and blocks
    @classmethod
    def empty_sequence(cls, cf=HyperbolicKL, cf_config_params=None, cf_params=None):
        """Empty sequence of the optimizer, serves as a basic template to be appended to.
        Parameters
        ----------
        cf : function or callable, optional
            Class that implements the method's obj, grad, hess. These methods should return a float, vector and matrix,
            respectively.
        cf_config_params : dict, optional (default: None)
            Additional params that configure the cost function class, e.g., the angle for the KLDivergence.
        cf_params : dict, optional (default: None)
            Additional params that should be passed to the obj, grad, hess functions of cf. For example, the
            high-dimensional matrix V.
        Returns
        -------
        template : dict
            A dict wrapping the cost function, its configuration parameters, further cost function parameters,
            as well as an empty list with potential sequence blocks to be performed.
        """
        print("Please note that `empty_sequence` uses the KL divergence with Barnes-Hut approximation (angle=0.5) by default.")
        if cf_config_params is None:
            cf_config_params = HyperbolicKL.bh_tsne()
        if cf_params is None:
            cf_params = {}
        template = {"cf": cf, "cf_config_params": cf_config_params, "cf_params": cf_params, "sequence": []}
        return template


    @classmethod
    def sequence_poincare(cls, exaggeration_its=250, exaggeration=12, gradientDescent_its=750,
                          n_iter_check=np.inf, threshold_cf=0., threshold_its=-1, threshold_check_size=-1, size_tol=None,
                          learning_rate_ex=0.1, learning_rate_main=0.1, momentum_ex=0.5, momentum=0.8, vanilla=False, exact=True, calc_both=False, angle=0.5,
                          area_split=False, grad_fix=False, grad_scale_fix=False):
        # Start with an empty sequence
        cf_config_params = HyperbolicKL.exact_tsne() if exact else HyperbolicKL.bh_tsne()
        cf_config_params["params"]["calc_both"] = calc_both
        cf_config_params["params"]["area_split"] = area_split
        cf_config_params["params"]["grad_fix"] = grad_fix

        if not exact:
            cf_config_params["params"]["angle"] = angle

        template = SequentialOptimizer.empty_sequence(cf=HyperbolicKL, cf_config_params=cf_config_params)

        # Add the blocks necessary for early exaggeration
        template["sequence"] = SequentialOptimizer.add_block_early_exaggeration(
            template["sequence"], earlyExaggeration_its=exaggeration_its, momentum=momentum_ex, learning_rate=learning_rate_ex,
            exaggeration=exaggeration, n_iter_check=n_iter_check,
            threshold_cf=threshold_cf, threshold_its=threshold_its, threshold_check_size=threshold_check_size, grad_scale_fix=grad_scale_fix
        )

        template["sequence"][-2]["params"]["vanilla"] = vanilla
        template["sequence"][-2]["params"]["size_tol"] = size_tol

        # Add the block for burn in
        # template["sequence"] = SequentialOptimizer.add_block_gradient_descent_with_rescale_and_gradient_mask(
        #     template["sequence"], gradientDescent_its=10, n_iter_check=n_iter_check,
        #     threshold_cf=threshold_cf, threshold_its=threshold_its, threshold_check_size=threshold_check_size,
        #     learning_rate=learning_rate_main / 10, momentum=momentum, vanilla=vanilla
        # )
        #
        # template["sequence"][-1]["params"]["vanilla"] = vanilla

        # Add the block for gradient descent
        template["sequence"] = SequentialOptimizer.add_block_gradient_descent_with_rescale_and_gradient_mask(
            template["sequence"], gradientDescent_its=gradientDescent_its, n_iter_check=n_iter_check,
            threshold_cf=threshold_cf, threshold_its=threshold_its, threshold_check_size=threshold_check_size,
            learning_rate=learning_rate_main, momentum=momentum, vanilla=vanilla, grad_scale_fix=grad_scale_fix
        )

        template["sequence"][-1]["params"]["size_tol"] = size_tol        

        return template

    @classmethod
    def add_block_early_exaggeration(
            cls, sequence, earlyExaggeration_its, momentum=0.5, learning_rate=200.0, exaggeration=12.0, rescale=None,
            n_iter_rescale=np.inf, gradient_mask=np.ones, n_iter_check=np.inf,
            threshold_cf=0., threshold_its=-1, threshold_check_size=-1, vanilla=True, grad_scale_fix=False, verbose_solver=0
    ):
        """A block to perform early exaggeration.
        Parameters
        ----------
        sequence : List
            A list of processing blocks to be sequentially performed by the optimizer (can be an empty list).
        earlyExaggeration_its : int
            A positive number of early exaggeration steps to be performed by the solver.
        momentum : float, optional (default=0.8)
            Momentum for the gradient descent procedure.
        learning_rate : float, optional (default=200.0)
            Learning rate for the gradient descent procedure.
        exaggeration : float, optional (default=12.0)
            The value with which to multiply the V matrix before performing earlyExaggeration_its many gradient descent
            steps.
        rescale : float, optional (default: None)
            Rescale the embedding to have a bbox diagonal of this value whenever rescaling, if None, no rescaling is
            performed.
        n_iter_rescale : int, optional (default: 125)
            During the `rescale_its`, every `n_iter_n_iter_rescale` iteration, the bbox diagonal of the embedding is
            scaled to have length `rescale`.
        gradient_mask : numpy vector, optional (default: np.ones)
            During each solver iteration, apply the gradient descent update only to those elements that have an entry
            "1" in the gradient mask. Other entries should be "0".
        n_iter_check : int, optional (default: inf)
            Number of iterations before evaluating the global error. If the error is sufficiently low, we abort the
            optimization.
        threshold_cf : float, optional (default: 0.)
            A positive number, if the cost function goes below this, the solver stops.
        threshold_its : int, optional (default: -1)
            A positive number, if the solver performs this number of iterations, it stops.
        threshold_check_size : float, optional (default: -1)
            A positive number, providing the size to which to scale the current embedding when checking its error.
        verbose_solver: int, optional (default: 0)
            A positive or zero integer, indicating how verbose the solver should be.
        Returns
        -------
        sequence : List
            The given sequence of blocks with three additional blocks appended, representing the exaggeration of the V
            matrix, the gradient descent steps, and the de-exaggeration of the V matrix.
        """
        sequence.append({
                    "type": "processor", "function": SequentialOptimizer.exaggerate_matrix,
                    "params": {"exaggeration": exaggeration}
                })
        sequence.append({
                    "type": "solver", "function": gradient_descent,
                    "params": {
                        "n_iter": earlyExaggeration_its, "momentum": momentum, "learning_rate": learning_rate,
                        "rescale": rescale, "n_iter_rescale": n_iter_rescale, "gradient_mask": gradient_mask,
                        "n_iter_check": n_iter_check, "threshold_cf": threshold_cf, "threshold_its": threshold_its,
                        "threshold_check_size": threshold_check_size, "verbose": verbose_solver, "vanilla": vanilla,
                        "grad_scale_fix": grad_scale_fix,
                    }
                })
        sequence.append({
                    "type": "processor", "function": SequentialOptimizer.exaggerate_matrix,
                    "params": {"exaggeration": 1/exaggeration}
                })
        return sequence

    @classmethod
    def add_block_gradient_descent_with_rescale_and_gradient_mask(
            cls, sequence, gradientDescent_its, momentum=0.8, learning_rate=200.0, rescale=None, n_iter_rescale=np.inf,
            gradient_mask=np.ones, n_iter_check=np.inf, threshold_cf=0, threshold_its=-1, threshold_check_size=-1.,
            verbose_solver=0, vanilla=False, grad_scale_fix=False,
    ):
        """A block to perform a specified number of gradient descent steps.
        Parameters
        ----------
        sequence : List
            A list of processing blocks to be sequentially performed by the optimizer (can be an empty list).
        gradientDescent_its : int
            A positive number of gradient descent steps to be performed by the solver.
        momentum : float, optional (default=0.8)
            Momentum for the gradient descent procedure.
        learning_rate : float, optional (default=200.0)
            Learning rate for the gradient descent procedure.
        rescale : float, optional (default: None)
            Rescale the embedding to have a bbox diagonal of this value whenever rescaling, if None, no rescaling is
            performed.
        n_iter_rescale : int, optional (default: 125)
            During the `rescale_its`, every `n_iter_n_iter_rescale` iteration, the bbox diagonal of the embedding is
            scaled to have length `rescale`.
        gradient_mask : numpy vector, optional (default: np.ones)
            During each solver iteration, apply the gradient descent update only to those elements that have an entry
            "1" in the gradient mask. Other entries should be "0".
        n_iter_check : int, optional (default: inf)
            Number of iterations before evaluating the global error. If the error is sufficiently low, we abort the
            optimization.
        threshold_cf : float, optional (default: 0.)
            A positive number, if the cost function goes below this, the solver stops.
        threshold_its : int, optional (default: -1)
            A positive number, if the solver performs this number of iterations, it stops.
        threshold_check_size : float, optional (default: -1)
            A positive number, providing the size to which to scale the current embedding when checking its error.
        vanilla: bool, optional (default: True)
            If True, then vanilla gradient descent with a constant learning rate is used.
            Vanilla gradient descent doesn't use fancy tricks like momentum to accelerate convergence.
        verbose_solver: int, optional (default: 0)
            A positive or zero integer, indicating how verbose the solver should be.
        Returns
        -------
        sequence : List
            The given sequence of blocks with one additional block appended, representing the gradient descent
            iterations.
        """
        sequence.append(
            {
                    "type": "solver", "function": gradient_descent,
                    "params": {
                        "n_iter": gradientDescent_its, "momentum": momentum, "learning_rate": learning_rate,
                        "rescale": rescale, "n_iter_rescale": n_iter_rescale, "gradient_mask": gradient_mask,
                        "n_iter_check": n_iter_check, "threshold_cf": threshold_cf, "threshold_its": threshold_its,
                        "threshold_check_size": threshold_check_size, "verbose": verbose_solver, "vanilla": vanilla,
                        "grad_scale_fix": grad_scale_fix,
                    }
            })
        return sequence

    @classmethod
    def available_params(cls):
        # Pretty prints the available params in this optimizer
        super(SequentialOptimizer, cls).available_params()
        print("# SequentialOptimizer parameters (root of `other_params` dict)")
        print("==============================================================")
        print("'sequence': [")
        print("    {'type': 'processor', 'function': processorFunction, 'params': {params for processorFunction}}")
        print("    {'type': 'solver', 'function': solverFunction, 'params': {params for solverFunction}}")
        print("    ...")
        print("]")
        print("==============================================================")

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Contact— JavaScript license information— Web API

back to top