Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/msmathcomp/hyperbolic-tsne
03 May 2024, 17:35:23 UTC
  • Code
  • Branches (2)
  • Releases (0)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/main
    • refs/heads/related_works
    No releases to show
  • 477dbb2
  • /
  • hyperbolicTSNE
  • /
  • data_loaders.py
Raw File Download
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge Iframe embedding
swh:1:cnt:a089164701a7697c9bf84cb0495dbd0d91f12efb
origin badgedirectory badge Iframe embedding
swh:1:dir:649e6af77a57c87647798bf148756724cab0ab03
origin badgerevision badge
swh:1:rev:bba9d0f089659fb170c7270aa90c796f91bfb2b1
origin badgesnapshot badge
swh:1:snp:9ac5f55368e393bdd437f6b15aab9fdb4f438510
Citations

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: bba9d0f089659fb170c7270aa90c796f91bfb2b1 authored by Martin Skrodzki on 02 May 2024, 12:34:19 UTC
Update README.md
Tip revision: bba9d0f
data_loaders.py
""" Facilities for loading the datasets used in the paper.
All methods return a numpy matrix X and a labels vector Y, if available.
"""
import os
from pathlib import Path
import time
import gzip
from enum import Enum, auto

from sklearn.decomposition import PCA, TruncatedSVD
import numpy as np
import scipy.sparse

from .hd_mat_ import hd_matrix, check_knn_method, check_hd_method, get_n_neighbors


# Types of dataset interfaces in scikit learn:
# - Dataset loaders
# - Dataset fetchers
# - Dataset generators

# These methods return a dict with at least two elements:
# - data: array of shape n_samples * n_features
# - target: array of shape n_samples
# if return_X_y == True, methods return (X, y)


class Datasets(Enum):
    MNIST = auto()  # DONE
    MYELOID = auto()  # DONE
    PLANARIA = auto()  # DONE
    PAUL = auto()  # DONE
    C_ELEGANS = auto()  # DONE
    LUKK = auto()  # DONE
    MYELOID8000 = auto()  # DONE
    WORDNET = auto()  # DONE


def load_mnist(data_home=None, return_X_y=True, kind='all'):
    """
    Loads different versions of the MNIST dataset. The function was taken from
    https://github.com/zalandoresearch/fashion-mnist/blob/master/utils/mnist_reader.py

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    kind: str, optional
        Defines if the training set (60000 points) or the test set (10000)
        is loaded.
    """

    # Use default location
    if data_home is None:
        data_home = os.path.join(os.path.dirname(__file__), 'datasets')

    full_path = os.path.join(data_home, 'mnist')

    labels_path_train = os.path.join(full_path, 'train-labels-idx1-ubyte.gz')

    labels_path_test = os.path.join(full_path, 't10k-labels-idx1-ubyte.gz')

    images_path_train = os.path.join(full_path, 'train-images-idx3-ubyte.gz')

    images_path_test = os.path.join(full_path, 't10k-images-idx3-ubyte.gz')

    labels_arr = []
    images_arr = []

    if kind == 'all' or kind == 'train':
        with gzip.open(labels_path_train, 'rb') as lbpath:
            br = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)
            labels_arr.append(br)

    if kind == 'all' or kind == 'test':
        with gzip.open(labels_path_test, 'rb') as lbpath:
            br = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)
            labels_arr.append(br)

    if kind == 'all' or kind == 'train':
        with gzip.open(images_path_train, 'rb') as imgpath:
            br = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16)
            images = br.reshape(len(labels_arr[0]), 784)
            images_arr.append(images)

    if kind == 'all' or kind == 'test':
        with gzip.open(images_path_test, 'rb') as imgpath:
            br = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16)
            images = br.reshape(len(labels_arr[1]), 784)
            images_arr.append(images)

    labels = np.concatenate(labels_arr, axis=0)
    images = np.concatenate(images_arr, axis=0)

    if return_X_y:
        return images, labels
    else:
        return images


def load_c_elegans(data_home, return_X_y=True):
    """
    Loads C-ELEGANS data available at https://data.caltech.edu/records/1945 

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """    
    import anndata as ad

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "c_elegans")

    ad_obj = ad.read_h5ad(str(Path.joinpath(full_path, "packer2019.h5ad")))
    X = ad_obj.X

    labels_str = np.array(ad_obj.obs.cell_type)

    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_myeloid(data_home, return_X_y=True):
    """
    Loads MYELOID data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "myeloid-progenitors")

    X = np.loadtxt(str(Path.joinpath(full_path, "MyeloidProgenitors.csv")), delimiter=",", skiprows=1, usecols=np.arange(11))

    labels_str = np.loadtxt(str(Path.joinpath(full_path, "MyeloidProgenitors.csv")), delimiter=",", skiprows=1, usecols=11, dtype=str)

    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_myeloid8000(data_home, return_X_y=True):
    """
    Loads MYELOID 8000 data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "myeloid8000")

    X = np.loadtxt(str(Path.joinpath(full_path, "myeloid_8000.csv")), delimiter=",")

    labels_str = np.loadtxt(str(Path.joinpath(full_path, "myeloid_8000_labels.csv")), delimiter=",", dtype=str)

    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_planaria(data_home, return_X_y=True):
    """
    Loads PLANARIA data available at https://shiny.mdc-berlin.de/psca/

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "planaria")

    X = np.loadtxt(str(Path.joinpath(full_path, "R_pca_seurat.txt")), delimiter="\t")

    labels_str = np.loadtxt(str(Path.joinpath(full_path, "R_annotation.txt")), delimiter=",", dtype=str)
    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_wordnet(data_home, return_X_y=True):
    """
    Loads WORDNET data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    import torch

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "wordnet")

    model = torch.load(str(Path.joinpath(full_path, "nouns.bin.best")))

    X = np.array(model["embeddings"])

    labels_str = np.array(model["objects"])
    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_lukk(data_home, return_X_y=True):
    """
    Loads LUKK data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "lukk")

    if (x := Path.joinpath(full_path, "lukk_x.npy")).exists() and (y := Path.joinpath(full_path, "lukk_y.npy")).exists():
        return np.load(str(x)), np.load(str(y))

    import pandas as pd
    sample_data_rel = pd.read_csv(
        str(Path.joinpath(full_path, "E-MTAB-62.sdrf.txt")),
        sep='\t',
        index_col='Source Name'
    )

    affymetrix = (
        pd.read_csv(
            str(Path.joinpath(full_path, "E-MTAB-62.processed.2.zip")),
            sep='\t',
            index_col='Hybridization REF',
            dtype='object',
            engine='python'
        )
        .drop('CompositeElement REF')
        .astype('float32')
        .T
        .loc[sample_data_rel.index]
    )

    X = affymetrix.values

    labels_str = sample_data_rel['Factor Value[4 groups from blood to incompletely diff]'].values
    # labels_str = sample_data_rel['Characteristics[4 meta-groups]'].values

    _, labels = np.unique(labels_str, return_inverse=True)

    labels = labels.astype(int)
    if return_X_y:
        return X, labels
    else:
        return X


def _load_dataset(dataset, data_home=None, verbose=False, **kwargs):
    X = None
    labels = None
    if verbose:
        print("[Data Loader] Preparing to load the dataset")
    start = time.time()
    if dataset == Datasets.MNIST:
        X, labels = load_mnist(data_home, **kwargs)
    if dataset == Datasets.MYELOID:
        X, labels = load_myeloid(data_home, **kwargs)
    if dataset == Datasets.MYELOID8000:
        X, labels = load_myeloid8000(data_home, **kwargs)
    if dataset == Datasets.PLANARIA:
        X, labels = load_planaria(data_home, **kwargs)
    if dataset == Datasets.C_ELEGANS:
        X, labels = load_c_elegans(data_home, **kwargs)
    if dataset == Datasets.LUKK:
        X, labels = load_lukk(data_home, **kwargs)
    if dataset == Datasets.WORDNET:
        X, labels = load_wordnet(data_home, **kwargs)
    end = time.time()
    if verbose:
        print("[Data Loader] Data has been loaded and it took {}".format(end - start))
    return X, labels


def load_data(dataset, data_home=None, to_return='all', pca_components=100,
              knn_method="sklearn", metric="euclidean", n_neighbors=None, knn_params=None,
              hd_method='vdm2008', hd_params=None,
              sample=-1, random_state=42, verbose=False, **kwargs):
    """
    Loads the selected dataset.
    Parameters
    __________
    dataset : Datasets
        The selected dataset out of the available ones.
    to_return: bool, optional
        Separate what you want to obtain with underscores, possible options are:
        `X`, `labels`, `D`, `V`. You can use `all` to obtain all the quantities
        E.g. X_labels
    pca_components : int, optional
        Number of components to take out of the PCA representation of X to build P.
        If 0, PCA is not applied and the full versions are returned.
        If >0, a reduced dataset X and its corresponding matrix (only_data=False) are returned.
        The number of dimensions of this dataset is min(X.shape[1], pca_components).
    method : str, optional
        Method to use when computing V.
        If 'exact', a point is compared against all the others.
        If 'sparse', only its NN are used and a sparse matrix (csr) is returned).
    sample: int, float optional
        Size of the sample to produce.
        If 0 < sample < 1 then denotes the fraction of the data to return.
        If sample < 1 then denotes the number of entries of X to return.
    random_state: int, optional
        Sets the random state to generate consistent samples.
        If less than 0 then random seed is not set.
    kwargs : dict
        Args to be used in specific loading methods.

    Returns
    _______
    X : ndarray
        Matrix of the data of the selected dataset.
    labels : ndarray, optional
        Array with the labels of each observation.
    D : ndarray, optional
        Matrix of distances in high-dimensional space.
    V : ndarray, optional
        Matrix of similarities.
        Can be either a ndarray in squareform (if dense) or a sparse csr matrix.
    sample_idx : ndarray, optional
        List of sampling indices.
    """
    if random_state > 0:
        np.random.seed(random_state)
    D_filepath = None
    V_filepath = None

    # Load selected dataset
    if data_home is None:
        data_home = os.path.join(os.path.dirname(__file__), "datasets")
    raw_X, raw_labels = _load_dataset(dataset, data_home, verbose=verbose, **kwargs)

    # Setup sample
    sample_idx = None
    if sample <= 0:
        sample_data = False
        X, labels = raw_X, raw_labels
    else:
        sample_data = True
        if 0 < sample < 1:  # sample a fraction of the data
            sample = round(sample * raw_X.shape[0])
        # Generate sample
        raw_idx = np.arange(raw_X.shape[0])
        sample_idx = np.sort(np.random.choice(raw_idx, size=sample, replace=False))  # uniformly sampled
        X = raw_X[sample_idx].copy()
        labels = raw_labels[sample_idx].copy()

    # Preprocess data by reducing its dimensionality
    if "X" in to_return or "D" in to_return or "V" in to_return:
        if pca_components > 0:
            pca_components = np.min([pca_components, X.shape[0], X.shape[1]])
            if scipy.sparse.isspmatrix(X):
                if verbose:
                    print("[Data Loader] Input matrix X is sparse ... using sparse PCA")
                pca = TruncatedSVD(n_components=pca_components, random_state=random_state)
                X = pca.fit_transform(X)
            else:
                if verbose:
                    print("[Data Loader] Input matrix X is dense ... using dense PCA")
                pca = PCA(n_components=pca_components, svd_solver="randomized", random_state=random_state)  # remember random state
                X = pca.fit_transform(X)

    X = X.astype(np.float32, copy=False)

    to_return = "labels_X_D_V" if to_return == "all" else to_return
    to_return = to_return.split("_")
    out = []

    if verbose:
        print("[Data Loader] The following elements will be returned: {}".format(", ".join(to_return)))

    if "X" in to_return:
        out.append(X)
    if "labels" in to_return:
        out.append(labels)
    if "D" in to_return or "V" in to_return:
        D = None
        V = None
        # Here, the V matrix is the high-dimensional matrix using by each method
        # For example, in t-SNE, this matrix is the "P" matrix.
        # A V matrix file has the following convention: Vmat-dataset-method-matrix_type-pca_components-other_params.npz

        # Other parameters check
        if verbose:
            print("[Data Loader] Fetching and updating parameters of selected `knn_method`")
        knn_method, knn_params = check_knn_method(knn_method, knn_params)

        if verbose:
            print("[Data Loader] Fetching and updating parameters of selected `method`")
        hd_method, hd_params = check_hd_method(hd_method, hd_params)

        if verbose > 0:
            print("[Data Loader] Params to use for the hd_method: {}".format(hd_params))
        n_neighbors = get_n_neighbors(X.shape[0], n_neighbors, hd_method, hd_params)

        # TODO: hardcoded, streamline
        if knn_method == "hnswlib" and knn_params["search_ef"] == -1:
            if verbose > 0:
                print("[Data Loader] Using default value for `search_ef` in hnswlib: n_neighbors + 1 = {}".format(n_neighbors+1))
            knn_params["search_ef"] = n_neighbors + 1

        if not sample_data:  # If its not a sample then the matrix is cached

            def load_mat_from_cache(folder, load_data_home, load_filename):
                home_path = os.path.join(load_data_home, folder)
                if not os.path.exists(home_path):
                    os.mkdir(home_path)

                filepath = os.path.join(home_path, load_filename)
                if os.path.exists(filepath):
                    return scipy.sparse.load_npz(filepath), filepath
                else:
                    return None, filepath

            # D matrices caching
            # - Make sure a place where the V matrices are stored is available

            def D_filename(D_dataset, D_pca_components, D_knn_method, D_n_neighbors, D_metric, D_knn_params, other_args):
                fn_str = "Dmat"
                fn_str += "-dataset$%s" % str(D_dataset).split(".")[1]
                fn_str += "-%s" % ('pca$%i' % D_pca_components if (D_pca_components > 0) else 'nopca')
                fn_str += "-knn_method$%s" % str(D_knn_method)
                fn_str += "-n_neighbors$%s" % str(D_n_neighbors)
                fn_str += "-metric$%s" % str(D_metric)
                if D_knn_params is not None and type(D_knn_params) is dict:
                    for k, v in D_knn_params.items():
                        fn_str += "-%s$%s" % (str(k), str(v))
                if other_args is not None and len(other_args) > 0 and type(other_args) is dict:
                    for k, v in other_args.items(): # TODO: assumes that kwargs are valid, but this might not be the case
                        fn_str += "-%s$%s" % (str(k), str(v))
                fn_str += ".npz"
                return fn_str

            filename = D_filename(dataset, pca_components, knn_method, n_neighbors, metric, knn_params, kwargs)
            D, D_filepath = load_mat_from_cache('D_matrices', data_home, filename)

            # V matrices caching
            # - Make sure a place where the V matrices are stored is available

            def V_filename(
                    V_dataset, V_pca_components, V_knn_method, V_n_neighbors, V_metric, V_knn_params, V_hd_method,
                    V_hd_params, other_args
            ):
                fn_str = "Vmat"
                fn_str += "-dataset$%s" % str(V_dataset).split(".")[1]
                fn_str += "-%s" % ('pca$%i' % V_pca_components if (V_pca_components > 0) else 'nopca')
                fn_str += "-knn_method$%s" % str(V_knn_method)
                fn_str += "-n_neighbors$%s" % str(V_n_neighbors)
                fn_str += "-metric$%s" % str(V_metric)
                if V_knn_params is not None and type(V_knn_params) is dict:
                    for k, v in V_knn_params.items():
                        fn_str += "-%s$%s" % (str(k), str(v))
                fn_str += "-hd_method$%s" % str(V_hd_method)
                if V_hd_params is not None and type(V_hd_params) is dict:
                    for k, v in V_hd_params.items():
                        fn_str += "-%s$%s" % (str(k), str(v))
                if other_args is not None and len(other_args) > 0 and type(other_args) is dict:
                    for k, v in other_args.items(): # TODO: assumes that kwargs are valid, but this might not be the case
                        fn_str += "-%s$%s" % (str(k), str(v))
                fn_str += ".npz"
                return fn_str

            filename = V_filename(
                dataset, pca_components, knn_method, n_neighbors, metric, knn_params, hd_method, hd_params, kwargs
            )
            V, V_filepath = load_mat_from_cache('V_matrices', data_home, filename)

        if D is None or V is None:
            if verbose:
                print("[Data Loader] Either D or V was not cached, computing them now ...")
            D, V = hd_matrix(X=X, D=D, V=V,
                             knn_method=knn_method, metric=metric, n_neighbors=n_neighbors, knn_params=knn_params,
                             hd_method=hd_method, hd_params=hd_params, verbose=verbose)
            if not sample_data:
                if verbose:
                    print("[Data Loader] Caching computed matrices ...")
                scipy.sparse.save_npz(D_filepath, D)
                scipy.sparse.save_npz(V_filepath, V)

        if "D" in to_return:
            out.append(D)
        if "V" in to_return:
            out.append(V)
        if sample_idx is not None:
            out.append(sample_idx)

    if len(out) == 1:
        return out[0]
    else:
        return out

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Contact— JavaScript license information— Web API

back to top