Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
content badge Iframe embedding
swh:1:cnt:fb0fbe53ccb2c2d23b76acb96ef006ddf8e0766c

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
#############################################################
# This file has all the facilities for loading the datasets #
# All methods should return a numpy matrix X and a labels
# vector Y if  available
##############################################################

import os
import time
import gzip
from enum import Enum, auto

from sklearn.decomposition import PCA, TruncatedSVD
import numpy as np
import scipy.sparse

from .hd_mat_ import hd_matrix, check_knn_method, check_hd_method, get_n_neighbors


# Types of dataset interfaces in scikit learn:
# - Dataset loaders
# - Dataset fetchers
# - Dataset generators

# These methods return a dict with at least two elements:
# - data: array of shape n_samples * n_features
# - target: array of shape n_samples
# if return_X_y == True, methods return (X, y)


class Datasets(Enum):
    MNIST = auto()  # DONE
    MYELOID = auto()  # DONE
    PLANARIA = auto()  # DONE
    PAUL = auto()  # DONE
    C_ELEGANS = auto()  # DONE
    LUKK = auto()  # DONE
    MYELOID8000 = auto()  # DONE
    WORDNET = auto()  # DONE


def load_mnist(data_home=None, return_X_y=True, kind='all'):
    """
    Loads different versions of the MNIST dataset. The function was taken from
    https://github.com/zalandoresearch/fashion-mnist/blob/master/utils/mnist_reader.py

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    kind: str, optional
        Defines if the training set (60000 points) or the test set (10000)
        is loaded.
    """

    # Use default location
    if data_home is None:
        data_home = os.path.join(os.path.dirname(__file__), 'datasets')

    full_path = os.path.join(data_home, 'mnist')

    labels_path_train = os.path.join(full_path, 'train-labels-idx1-ubyte.gz')

    labels_path_test = os.path.join(full_path, 't10k-labels-idx1-ubyte.gz')

    images_path_train = os.path.join(full_path, 'train-images-idx3-ubyte.gz')

    images_path_test = os.path.join(full_path, 't10k-images-idx3-ubyte.gz')

    labels_arr = []
    images_arr = []

    if kind == 'all' or kind == 'train':
        with gzip.open(labels_path_train, 'rb') as lbpath:
            br = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)
            labels_arr.append(br)

    if kind == 'all' or kind == 'test':
        with gzip.open(labels_path_test, 'rb') as lbpath:
            br = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)
            labels_arr.append(br)

    if kind == 'all' or kind == 'train':
        with gzip.open(images_path_train, 'rb') as imgpath:
            br = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16)
            images = br.reshape(len(labels_arr[0]), 784)
            images_arr.append(images)

    if kind == 'all' or kind == 'test':
        with gzip.open(images_path_test, 'rb') as imgpath:
            br = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16)
            images = br.reshape(len(labels_arr[1]), 784)
            images_arr.append(images)

    labels = np.concatenate(labels_arr, axis=0)
    images = np.concatenate(images_arr, axis=0)

    if return_X_y:
        return images, labels
    else:
        return images


def load_c_elegans(data_home, return_X_y=True):
    """
    Loads C-ELEGANS data available at https://data.caltech.edu/records/1945 

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    # Starting to use path instead of os TODO: move these to global imports
    from pathlib import Path
    import anndata as ad

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "c_elegans")

    ad_obj = ad.read_h5ad(str(Path.joinpath(full_path, "packer2019.h5ad")))
    X = ad_obj.X

    labels_str = np.array(ad_obj.obs.cell_type)

    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_myeloid(data_home, return_X_y=True):
    """
    Loads MYELOID data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    # Starting to use path instead of os TODO: move these to global imports
    from pathlib import Path

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "myeloid-progenitors")

    X = np.loadtxt(str(Path.joinpath(full_path, "MyeloidProgenitors.csv")), delimiter=",", skiprows=1, usecols=np.arange(11))

    labels_str = np.loadtxt(str(Path.joinpath(full_path, "MyeloidProgenitors.csv")), delimiter=",", skiprows=1, usecols=11, dtype=str)

    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_myeloid8000(data_home, return_X_y=True):
    """
    Loads MYELOID 8000 data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    # Starting to use path instead of os TODO: move these to global imports
    from pathlib import Path

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "myeloid8000")

    X = np.loadtxt(str(Path.joinpath(full_path, "myeloid_8000.csv")), delimiter=",")

    labels_str = np.loadtxt(str(Path.joinpath(full_path, "myeloid_8000_labels.csv")), delimiter=",", dtype=str)

    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_planaria(data_home, return_X_y=True):
    """
    Loads PLANARIA data available at https://shiny.mdc-berlin.de/psca/

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    # Starting to use path instead of os TODO: move these to global imports
    from pathlib import Path

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "planaria")

    X = np.loadtxt(str(Path.joinpath(full_path, "R_pca_seurat.txt")), delimiter="\t")

    labels_str = np.loadtxt(str(Path.joinpath(full_path, "R_annotation.txt")), delimiter=",", dtype=str)
    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_wordnet(data_home, return_X_y=True):
    """
    Loads WORDNET data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    # Starting to use path instead of os TODO: move these to global imports
    from pathlib import Path
    import torch

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "wordnet")

    model = torch.load(str(Path.joinpath(full_path, "nouns.bin.best")))

    X = np.array(model["embeddings"])

    labels_str = np.array(model["objects"])
    _, labels = np.unique(labels_str, return_inverse=True)

    if return_X_y:
        return X, labels
    else:
        return X


def load_lukk(data_home, return_X_y=True):
    """
    Loads LUKK data.

    Parameters
    __________
    data_home : str, optional
        Locations of the folder where the datasets are stored.
    return_X_y: bool, optional
        If True, method only returns tuple with the data and its labels.
    """
    # Starting to use path instead of os TODO: move these to global imports
    from pathlib import Path

    # Use default location
    if data_home is None:
        data_home = Path.joinpath(Path(__file__).parent, "datasets")
    else:
        data_home = Path(str(data_home))  # quick fix to deal with incoming os.paths

    full_path = Path.joinpath(data_home, "lukk")

    if (x := Path.joinpath(full_path, "lukk_x.npy")).exists() and (y := Path.joinpath(full_path, "lukk_y.npy")).exists():
        return np.load(str(x)), np.load(str(y))

    import pandas as pd
    sample_data_rel = pd.read_csv(
        str(Path.joinpath(full_path, "E-MTAB-62.sdrf.txt")),
        sep='\t',
        index_col='Source Name'
    )

    affymetrix = (
        pd.read_csv(
            str(Path.joinpath(full_path, "E-MTAB-62_processed_2.csv")),
            sep='\t',
            index_col='Hybridization REF',
            dtype='object',
            engine='python'
        )
        .drop('CompositeElement REF')
        .astype('float32')
        .T
        .loc[sample_data_rel.index]
    )

    X = affymetrix.values

    labels_str = sample_data_rel['Factor Value[4 groups from blood to incompletely diff]'].values
    # labels_str = sample_data_rel['Characteristics[4 meta-groups]'].values

    _, labels = np.unique(labels_str, return_inverse=True)

    labels = labels.astype(int)
    if return_X_y:
        return X, labels
    else:
        return X


def _load_dataset(dataset, data_home=None, verbose=False, **kwargs):
    X = None
    labels = None
    if verbose:
        print("[Data Loader] Preparing to load the dataset")
    start = time.time()
    if dataset == Datasets.MNIST:
        X, labels = load_mnist(data_home, **kwargs)
    if dataset == Datasets.MYELOID:
        X, labels = load_myeloid(data_home, **kwargs)
    if dataset == Datasets.MYELOID8000:
        X, labels = load_myeloid8000(data_home, **kwargs)
    if dataset == Datasets.PLANARIA:
        X, labels = load_planaria(data_home, **kwargs)
    if dataset == Datasets.C_ELEGANS:
        X, labels = load_c_elegans(data_home, **kwargs)
    if dataset == Datasets.LUKK:
        X, labels = load_lukk(data_home, **kwargs)
    if dataset == Datasets.WORDNET:
        X, labels = load_wordnet(data_home, **kwargs)
    end = time.time()
    if verbose:
        print("[Data Loader] Data has been loaded and it took {}".format(end - start))
    return X, labels


def load_data(dataset, data_home=None, to_return='all', pca_components=100,
              knn_method="sklearn", metric="euclidean", n_neighbors=None, knn_params=None,
              hd_method='vdm2008', hd_params=None,
              sample=-1, random_state=42, verbose=False, **kwargs):
    """
    Loads the selected dataset.
    Parameters
    __________
    dataset : Datasets
        The selected dataset out of the available ones.
    to_return: bool, optional
        Separate what you want to obtain with underscores, possible options are:
        `X`, `labels`, `D`, `V`. You can use `all` to obtain all the quantities
        E.g. X_labels
    pca_components : int, optional
        Number of components to take out of the PCA representation of X to build P.
        If 0, PCA is not applied and the full versions are returned.
        If >0, a reduced dataset X and its corresponding matrix (only_data=False) are returned.
        The number of dimensions of this dataset is min(X.shape[1], pca_components).
    method : str, optional
        Method to use when computing V.
        If 'exact', a point is compared against all the others.
        If 'sparse', only its NN are used and a sparse matrix (csr) is returned).
    sample: int, float optional
        Size of the sample to produce.
        If 0 < sample < 1 then denotes the fraction of the data to return.
        If sample < 1 then denotes the number of entries of X to return.
    random_state: int, optional
        Sets the random state to generate consistent samples.
        If less than 0 then random seed is not set.
    kwargs : dict
        Args to be used in specific loading methods.

    Returns
    _______
    X : ndarray
        Matrix of the data of the selected dataset.
    labels : ndarray, optional
        Array with the labels of each observation.
    D : ndarray, optional
        Matrix of distances in high-dimensional space.
    V : ndarray, optional
        Matrix of similarities.
        Can be either a ndarray in squareform (if dense) or a sparse csr matrix.
    sample_idx : ndarray, optional
        List of sampling indices.
    """
    # TODO: how to deal with parameter complexity?
    if random_state > 0:
        np.random.seed(random_state)
    D_filepath = None
    V_filepath = None

    # Load selected dataset
    if data_home is None:
        data_home = os.path.join(os.path.dirname(__file__), "datasets")
    raw_X, raw_labels = _load_dataset(dataset, data_home, verbose=verbose, **kwargs)

    # Setup sample
    sample_idx = None
    if sample <= 0:
        sample_data = False
        X, labels = raw_X, raw_labels
    else:
        sample_data = True
        if 0 < sample < 1:  # sample a fraction of the data
            sample = round(sample * raw_X.shape[0])
        # Generate sample
        raw_idx = np.arange(raw_X.shape[0])
        sample_idx = np.sort(np.random.choice(raw_idx, size=sample, replace=False))  # uniformly sampled
        X = raw_X[sample_idx].copy()
        labels = raw_labels[sample_idx].copy()

    # Preprocess data by reducing its dimensionality
    if "X" in to_return or "D" in to_return or "V" in to_return:
        if pca_components > 0:
            pca_components = np.min([pca_components, X.shape[0], X.shape[1]])
            if scipy.sparse.isspmatrix(X):
                if verbose:
                    print("[Data Loader] Input matrix X is sparse ... using sparse PCA")
                pca = TruncatedSVD(n_components=pca_components, random_state=random_state)
                X = pca.fit_transform(X)
            else:
                if verbose:
                    print("[Data Loader] Input matrix X is dense ... using dense PCA")
                pca = PCA(n_components=pca_components, svd_solver="randomized", random_state=random_state)  # remember random state
                X = pca.fit_transform(X)

    X = X.astype(np.float32, copy=False)

    to_return = "labels_X_D_V" if to_return == "all" else to_return
    to_return = to_return.split("_")
    out = []

    if verbose:
        print("[Data Loader] The following elements will be returned: {}".format(", ".join(to_return)))

    if "X" in to_return:
        out.append(X)
    if "labels" in to_return:
        out.append(labels)
    if "D" in to_return or "V" in to_return:
        D = None
        V = None
        # Here, the V matrix is the high-dimensional matrix using by each method
        # For example, in t-SNE, this matrix is the "P" matrix.
        # A V matrix file has the following convention: Vmat-dataset-method-matrix_type-pca_components-other_params.npz

        # Other parameters check
        if verbose:
            print("[Data Loader] Fetching and updating parameters of selected `knn_method`")
        knn_method, knn_params = check_knn_method(knn_method, knn_params)

        if verbose:
            print("[Data Loader] Fetching and updating parameters of selected `method`")
        hd_method, hd_params = check_hd_method(hd_method, hd_params)

        if verbose > 0:
            print("[Data Loader] Params to use for the hd_method: {}".format(hd_params))
        n_neighbors = get_n_neighbors(X.shape[0], n_neighbors, hd_method, hd_params)

        # TODO: hardcoded, streamline
        if knn_method == "hnswlib" and knn_params["search_ef"] == -1:
            if verbose > 0:
                print("[Data Loader] Using default value for `search_ef` in hnswlib: n_neighbors + 1 = {}".format(n_neighbors+1))
            knn_params["search_ef"] = n_neighbors + 1

        if not sample_data:  # If its not a sample then the matrix is cached

            def load_mat_from_cache(folder, load_data_home, load_filename):
                home_path = os.path.join(load_data_home, folder)
                if not os.path.exists(home_path):
                    os.mkdir(home_path)

                filepath = os.path.join(home_path, load_filename)
                if os.path.exists(filepath):
                    return scipy.sparse.load_npz(filepath), filepath
                else:
                    return None, filepath

            # D matrices caching
            # - Make sure a place where the V matrices are stored is available

            def D_filename(D_dataset, D_pca_components, D_knn_method, D_n_neighbors, D_metric, D_knn_params, other_args):
                fn_str = "Dmat"
                fn_str += "-dataset$%s" % str(D_dataset).split(".")[1]
                fn_str += "-%s" % ('pca$%i' % D_pca_components if (D_pca_components > 0) else 'nopca')
                fn_str += "-knn_method$%s" % str(D_knn_method)
                fn_str += "-n_neighbors$%s" % str(D_n_neighbors)
                fn_str += "-metric$%s" % str(D_metric)
                if D_knn_params is not None and type(D_knn_params) is dict:
                    for k, v in D_knn_params.items():
                        fn_str += "-%s$%s" % (str(k), str(v))
                if other_args is not None and len(other_args) > 0 and type(other_args) is dict:
                    for k, v in other_args.items(): # TODO: assumes that kwargs are valid, but this might not be the case
                        fn_str += "-%s$%s" % (str(k), str(v))
                fn_str += ".npz"
                return fn_str

            filename = D_filename(dataset, pca_components, knn_method, n_neighbors, metric, knn_params, kwargs)
            D, D_filepath = load_mat_from_cache('D_matrices', data_home, filename)

            # V matrices caching
            # - Make sure a place where the V matrices are stored is available

            def V_filename(
                    V_dataset, V_pca_components, V_knn_method, V_n_neighbors, V_metric, V_knn_params, V_hd_method,
                    V_hd_params, other_args
            ):
                fn_str = "Vmat"
                fn_str += "-dataset$%s" % str(V_dataset).split(".")[1]
                fn_str += "-%s" % ('pca$%i' % V_pca_components if (V_pca_components > 0) else 'nopca')
                fn_str += "-knn_method$%s" % str(V_knn_method)
                fn_str += "-n_neighbors$%s" % str(V_n_neighbors)
                fn_str += "-metric$%s" % str(V_metric)
                if V_knn_params is not None and type(V_knn_params) is dict:
                    for k, v in V_knn_params.items():
                        fn_str += "-%s$%s" % (str(k), str(v))
                fn_str += "-hd_method$%s" % str(V_hd_method)
                if V_hd_params is not None and type(V_hd_params) is dict:
                    for k, v in V_hd_params.items():
                        fn_str += "-%s$%s" % (str(k), str(v))
                if other_args is not None and len(other_args) > 0 and type(other_args) is dict:
                    for k, v in other_args.items(): # TODO: assumes that kwargs are valid, but this might not be the case
                        fn_str += "-%s$%s" % (str(k), str(v))
                fn_str += ".npz"
                return fn_str

            filename = V_filename(
                dataset, pca_components, knn_method, n_neighbors, metric, knn_params, hd_method, hd_params, kwargs
            )
            V, V_filepath = load_mat_from_cache('V_matrices', data_home, filename)

        if D is None or V is None:
            if verbose:
                print("[Data Loader] Either D or V was not cached, computing them now ...")
            D, V = hd_matrix(X=X, D=D, V=V,
                             knn_method=knn_method, metric=metric, n_neighbors=n_neighbors, knn_params=knn_params,
                             hd_method=hd_method, hd_params=hd_params, verbose=verbose)
            if not sample_data:
                if verbose:
                    print("[Data Loader] Caching computed matrices ...")
                scipy.sparse.save_npz(D_filepath, D)
                scipy.sparse.save_npz(V_filepath, V)

        if "D" in to_return:
            out.append(D)
        if "V" in to_return:
            out.append(V)
        if sample_idx is not None:
            out.append(sample_idx)

    if len(out) == 1:
        return out[0]
    else:
        return out

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API