#############################################################
# This file has all the facilities for loading the datasets #
# All methods should return a numpy matrix X and a labels
# vector Y if available
##############################################################
import os
import time
import gzip
from enum import Enum, auto
from sklearn.decomposition import PCA, TruncatedSVD
import numpy as np
import scipy.sparse
from .hd_mat_ import hd_matrix, check_knn_method, check_hd_method, get_n_neighbors
# Types of dataset interfaces in scikit learn:
# - Dataset loaders
# - Dataset fetchers
# - Dataset generators
# These methods return a dict with at least two elements:
# - data: array of shape n_samples * n_features
# - target: array of shape n_samples
# if return_X_y == True, methods return (X, y)
class Datasets(Enum):
MNIST = auto() # DONE
MYELOID = auto() # DONE
PLANARIA = auto() # DONE
PAUL = auto() # DONE
C_ELEGANS = auto() # DONE
LUKK = auto() # DONE
MYELOID8000 = auto() # DONE
WORDNET = auto() # DONE
def load_mnist(data_home=None, return_X_y=True, kind='all'):
"""
Loads different versions of the MNIST dataset. The function was taken from
https://github.com/zalandoresearch/fashion-mnist/blob/master/utils/mnist_reader.py
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
kind: str, optional
Defines if the training set (60000 points) or the test set (10000)
is loaded.
"""
# Use default location
if data_home is None:
data_home = os.path.join(os.path.dirname(__file__), 'datasets')
full_path = os.path.join(data_home, 'mnist')
labels_path_train = os.path.join(full_path, 'train-labels-idx1-ubyte.gz')
labels_path_test = os.path.join(full_path, 't10k-labels-idx1-ubyte.gz')
images_path_train = os.path.join(full_path, 'train-images-idx3-ubyte.gz')
images_path_test = os.path.join(full_path, 't10k-images-idx3-ubyte.gz')
labels_arr = []
images_arr = []
if kind == 'all' or kind == 'train':
with gzip.open(labels_path_train, 'rb') as lbpath:
br = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)
labels_arr.append(br)
if kind == 'all' or kind == 'test':
with gzip.open(labels_path_test, 'rb') as lbpath:
br = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)
labels_arr.append(br)
if kind == 'all' or kind == 'train':
with gzip.open(images_path_train, 'rb') as imgpath:
br = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16)
images = br.reshape(len(labels_arr[0]), 784)
images_arr.append(images)
if kind == 'all' or kind == 'test':
with gzip.open(images_path_test, 'rb') as imgpath:
br = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16)
images = br.reshape(len(labels_arr[1]), 784)
images_arr.append(images)
labels = np.concatenate(labels_arr, axis=0)
images = np.concatenate(images_arr, axis=0)
if return_X_y:
return images, labels
else:
return images
def load_c_elegans(data_home, return_X_y=True):
"""
Loads C-ELEGANS data available at https://data.caltech.edu/records/1945
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
"""
# Starting to use path instead of os TODO: move these to global imports
from pathlib import Path
import anndata as ad
# Use default location
if data_home is None:
data_home = Path.joinpath(Path(__file__).parent, "datasets")
else:
data_home = Path(str(data_home)) # quick fix to deal with incoming os.paths
full_path = Path.joinpath(data_home, "c_elegans")
ad_obj = ad.read_h5ad(str(Path.joinpath(full_path, "packer2019.h5ad")))
X = ad_obj.X
labels_str = np.array(ad_obj.obs.cell_type)
_, labels = np.unique(labels_str, return_inverse=True)
if return_X_y:
return X, labels
else:
return X
def load_myeloid(data_home, return_X_y=True):
"""
Loads MYELOID data.
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
"""
# Starting to use path instead of os TODO: move these to global imports
from pathlib import Path
# Use default location
if data_home is None:
data_home = Path.joinpath(Path(__file__).parent, "datasets")
else:
data_home = Path(str(data_home)) # quick fix to deal with incoming os.paths
full_path = Path.joinpath(data_home, "myeloid-progenitors")
X = np.loadtxt(str(Path.joinpath(full_path, "MyeloidProgenitors.csv")), delimiter=",", skiprows=1, usecols=np.arange(11))
labels_str = np.loadtxt(str(Path.joinpath(full_path, "MyeloidProgenitors.csv")), delimiter=",", skiprows=1, usecols=11, dtype=str)
_, labels = np.unique(labels_str, return_inverse=True)
if return_X_y:
return X, labels
else:
return X
def load_myeloid8000(data_home, return_X_y=True):
"""
Loads MYELOID 8000 data.
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
"""
# Starting to use path instead of os TODO: move these to global imports
from pathlib import Path
# Use default location
if data_home is None:
data_home = Path.joinpath(Path(__file__).parent, "datasets")
else:
data_home = Path(str(data_home)) # quick fix to deal with incoming os.paths
full_path = Path.joinpath(data_home, "myeloid8000")
X = np.loadtxt(str(Path.joinpath(full_path, "myeloid_8000.csv")), delimiter=",")
labels_str = np.loadtxt(str(Path.joinpath(full_path, "myeloid_8000_labels.csv")), delimiter=",", dtype=str)
_, labels = np.unique(labels_str, return_inverse=True)
if return_X_y:
return X, labels
else:
return X
def load_planaria(data_home, return_X_y=True):
"""
Loads PLANARIA data available at https://shiny.mdc-berlin.de/psca/
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
"""
# Starting to use path instead of os TODO: move these to global imports
from pathlib import Path
# Use default location
if data_home is None:
data_home = Path.joinpath(Path(__file__).parent, "datasets")
else:
data_home = Path(str(data_home)) # quick fix to deal with incoming os.paths
full_path = Path.joinpath(data_home, "planaria")
X = np.loadtxt(str(Path.joinpath(full_path, "R_pca_seurat.txt")), delimiter="\t")
labels_str = np.loadtxt(str(Path.joinpath(full_path, "R_annotation.txt")), delimiter=",", dtype=str)
_, labels = np.unique(labels_str, return_inverse=True)
if return_X_y:
return X, labels
else:
return X
def load_wordnet(data_home, return_X_y=True):
"""
Loads WORDNET data.
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
"""
# Starting to use path instead of os TODO: move these to global imports
from pathlib import Path
import torch
# Use default location
if data_home is None:
data_home = Path.joinpath(Path(__file__).parent, "datasets")
else:
data_home = Path(str(data_home)) # quick fix to deal with incoming os.paths
full_path = Path.joinpath(data_home, "wordnet")
model = torch.load(str(Path.joinpath(full_path, "nouns.bin.best")))
X = np.array(model["embeddings"])
labels_str = np.array(model["objects"])
_, labels = np.unique(labels_str, return_inverse=True)
if return_X_y:
return X, labels
else:
return X
def load_lukk(data_home, return_X_y=True):
"""
Loads LUKK data.
Parameters
__________
data_home : str, optional
Locations of the folder where the datasets are stored.
return_X_y: bool, optional
If True, method only returns tuple with the data and its labels.
"""
# Starting to use path instead of os TODO: move these to global imports
from pathlib import Path
# Use default location
if data_home is None:
data_home = Path.joinpath(Path(__file__).parent, "datasets")
else:
data_home = Path(str(data_home)) # quick fix to deal with incoming os.paths
full_path = Path.joinpath(data_home, "lukk")
if (x := Path.joinpath(full_path, "lukk_x.npy")).exists() and (y := Path.joinpath(full_path, "lukk_y.npy")).exists():
return np.load(str(x)), np.load(str(y))
import pandas as pd
sample_data_rel = pd.read_csv(
str(Path.joinpath(full_path, "E-MTAB-62.sdrf.txt")),
sep='\t',
index_col='Source Name'
)
affymetrix = (
pd.read_csv(
str(Path.joinpath(full_path, "E-MTAB-62_processed_2.csv")),
sep='\t',
index_col='Hybridization REF',
dtype='object',
engine='python'
)
.drop('CompositeElement REF')
.astype('float32')
.T
.loc[sample_data_rel.index]
)
X = affymetrix.values
labels_str = sample_data_rel['Factor Value[4 groups from blood to incompletely diff]'].values
# labels_str = sample_data_rel['Characteristics[4 meta-groups]'].values
_, labels = np.unique(labels_str, return_inverse=True)
labels = labels.astype(int)
if return_X_y:
return X, labels
else:
return X
def _load_dataset(dataset, data_home=None, verbose=False, **kwargs):
X = None
labels = None
if verbose:
print("[Data Loader] Preparing to load the dataset")
start = time.time()
if dataset == Datasets.MNIST:
X, labels = load_mnist(data_home, **kwargs)
if dataset == Datasets.MYELOID:
X, labels = load_myeloid(data_home, **kwargs)
if dataset == Datasets.MYELOID8000:
X, labels = load_myeloid8000(data_home, **kwargs)
if dataset == Datasets.PLANARIA:
X, labels = load_planaria(data_home, **kwargs)
if dataset == Datasets.C_ELEGANS:
X, labels = load_c_elegans(data_home, **kwargs)
if dataset == Datasets.LUKK:
X, labels = load_lukk(data_home, **kwargs)
if dataset == Datasets.WORDNET:
X, labels = load_wordnet(data_home, **kwargs)
end = time.time()
if verbose:
print("[Data Loader] Data has been loaded and it took {}".format(end - start))
return X, labels
def load_data(dataset, data_home=None, to_return='all', pca_components=100,
knn_method="sklearn", metric="euclidean", n_neighbors=None, knn_params=None,
hd_method='vdm2008', hd_params=None,
sample=-1, random_state=42, verbose=False, **kwargs):
"""
Loads the selected dataset.
Parameters
__________
dataset : Datasets
The selected dataset out of the available ones.
to_return: bool, optional
Separate what you want to obtain with underscores, possible options are:
`X`, `labels`, `D`, `V`. You can use `all` to obtain all the quantities
E.g. X_labels
pca_components : int, optional
Number of components to take out of the PCA representation of X to build P.
If 0, PCA is not applied and the full versions are returned.
If >0, a reduced dataset X and its corresponding matrix (only_data=False) are returned.
The number of dimensions of this dataset is min(X.shape[1], pca_components).
method : str, optional
Method to use when computing V.
If 'exact', a point is compared against all the others.
If 'sparse', only its NN are used and a sparse matrix (csr) is returned).
sample: int, float optional
Size of the sample to produce.
If 0 < sample < 1 then denotes the fraction of the data to return.
If sample < 1 then denotes the number of entries of X to return.
random_state: int, optional
Sets the random state to generate consistent samples.
If less than 0 then random seed is not set.
kwargs : dict
Args to be used in specific loading methods.
Returns
_______
X : ndarray
Matrix of the data of the selected dataset.
labels : ndarray, optional
Array with the labels of each observation.
D : ndarray, optional
Matrix of distances in high-dimensional space.
V : ndarray, optional
Matrix of similarities.
Can be either a ndarray in squareform (if dense) or a sparse csr matrix.
sample_idx : ndarray, optional
List of sampling indices.
"""
# TODO: how to deal with parameter complexity?
if random_state > 0:
np.random.seed(random_state)
D_filepath = None
V_filepath = None
# Load selected dataset
if data_home is None:
data_home = os.path.join(os.path.dirname(__file__), "datasets")
raw_X, raw_labels = _load_dataset(dataset, data_home, verbose=verbose, **kwargs)
# Setup sample
sample_idx = None
if sample <= 0:
sample_data = False
X, labels = raw_X, raw_labels
else:
sample_data = True
if 0 < sample < 1: # sample a fraction of the data
sample = round(sample * raw_X.shape[0])
# Generate sample
raw_idx = np.arange(raw_X.shape[0])
sample_idx = np.sort(np.random.choice(raw_idx, size=sample, replace=False)) # uniformly sampled
X = raw_X[sample_idx].copy()
labels = raw_labels[sample_idx].copy()
# Preprocess data by reducing its dimensionality
if "X" in to_return or "D" in to_return or "V" in to_return:
if pca_components > 0:
pca_components = np.min([pca_components, X.shape[0], X.shape[1]])
if scipy.sparse.isspmatrix(X):
if verbose:
print("[Data Loader] Input matrix X is sparse ... using sparse PCA")
pca = TruncatedSVD(n_components=pca_components, random_state=random_state)
X = pca.fit_transform(X)
else:
if verbose:
print("[Data Loader] Input matrix X is dense ... using dense PCA")
pca = PCA(n_components=pca_components, svd_solver="randomized", random_state=random_state) # remember random state
X = pca.fit_transform(X)
X = X.astype(np.float32, copy=False)
to_return = "labels_X_D_V" if to_return == "all" else to_return
to_return = to_return.split("_")
out = []
if verbose:
print("[Data Loader] The following elements will be returned: {}".format(", ".join(to_return)))
if "X" in to_return:
out.append(X)
if "labels" in to_return:
out.append(labels)
if "D" in to_return or "V" in to_return:
D = None
V = None
# Here, the V matrix is the high-dimensional matrix using by each method
# For example, in t-SNE, this matrix is the "P" matrix.
# A V matrix file has the following convention: Vmat-dataset-method-matrix_type-pca_components-other_params.npz
# Other parameters check
if verbose:
print("[Data Loader] Fetching and updating parameters of selected `knn_method`")
knn_method, knn_params = check_knn_method(knn_method, knn_params)
if verbose:
print("[Data Loader] Fetching and updating parameters of selected `method`")
hd_method, hd_params = check_hd_method(hd_method, hd_params)
if verbose > 0:
print("[Data Loader] Params to use for the hd_method: {}".format(hd_params))
n_neighbors = get_n_neighbors(X.shape[0], n_neighbors, hd_method, hd_params)
# TODO: hardcoded, streamline
if knn_method == "hnswlib" and knn_params["search_ef"] == -1:
if verbose > 0:
print("[Data Loader] Using default value for `search_ef` in hnswlib: n_neighbors + 1 = {}".format(n_neighbors+1))
knn_params["search_ef"] = n_neighbors + 1
if not sample_data: # If its not a sample then the matrix is cached
def load_mat_from_cache(folder, load_data_home, load_filename):
home_path = os.path.join(load_data_home, folder)
if not os.path.exists(home_path):
os.mkdir(home_path)
filepath = os.path.join(home_path, load_filename)
if os.path.exists(filepath):
return scipy.sparse.load_npz(filepath), filepath
else:
return None, filepath
# D matrices caching
# - Make sure a place where the V matrices are stored is available
def D_filename(D_dataset, D_pca_components, D_knn_method, D_n_neighbors, D_metric, D_knn_params, other_args):
fn_str = "Dmat"
fn_str += "-dataset$%s" % str(D_dataset).split(".")[1]
fn_str += "-%s" % ('pca$%i' % D_pca_components if (D_pca_components > 0) else 'nopca')
fn_str += "-knn_method$%s" % str(D_knn_method)
fn_str += "-n_neighbors$%s" % str(D_n_neighbors)
fn_str += "-metric$%s" % str(D_metric)
if D_knn_params is not None and type(D_knn_params) is dict:
for k, v in D_knn_params.items():
fn_str += "-%s$%s" % (str(k), str(v))
if other_args is not None and len(other_args) > 0 and type(other_args) is dict:
for k, v in other_args.items(): # TODO: assumes that kwargs are valid, but this might not be the case
fn_str += "-%s$%s" % (str(k), str(v))
fn_str += ".npz"
return fn_str
filename = D_filename(dataset, pca_components, knn_method, n_neighbors, metric, knn_params, kwargs)
D, D_filepath = load_mat_from_cache('D_matrices', data_home, filename)
# V matrices caching
# - Make sure a place where the V matrices are stored is available
def V_filename(
V_dataset, V_pca_components, V_knn_method, V_n_neighbors, V_metric, V_knn_params, V_hd_method,
V_hd_params, other_args
):
fn_str = "Vmat"
fn_str += "-dataset$%s" % str(V_dataset).split(".")[1]
fn_str += "-%s" % ('pca$%i' % V_pca_components if (V_pca_components > 0) else 'nopca')
fn_str += "-knn_method$%s" % str(V_knn_method)
fn_str += "-n_neighbors$%s" % str(V_n_neighbors)
fn_str += "-metric$%s" % str(V_metric)
if V_knn_params is not None and type(V_knn_params) is dict:
for k, v in V_knn_params.items():
fn_str += "-%s$%s" % (str(k), str(v))
fn_str += "-hd_method$%s" % str(V_hd_method)
if V_hd_params is not None and type(V_hd_params) is dict:
for k, v in V_hd_params.items():
fn_str += "-%s$%s" % (str(k), str(v))
if other_args is not None and len(other_args) > 0 and type(other_args) is dict:
for k, v in other_args.items(): # TODO: assumes that kwargs are valid, but this might not be the case
fn_str += "-%s$%s" % (str(k), str(v))
fn_str += ".npz"
return fn_str
filename = V_filename(
dataset, pca_components, knn_method, n_neighbors, metric, knn_params, hd_method, hd_params, kwargs
)
V, V_filepath = load_mat_from_cache('V_matrices', data_home, filename)
if D is None or V is None:
if verbose:
print("[Data Loader] Either D or V was not cached, computing them now ...")
D, V = hd_matrix(X=X, D=D, V=V,
knn_method=knn_method, metric=metric, n_neighbors=n_neighbors, knn_params=knn_params,
hd_method=hd_method, hd_params=hd_params, verbose=verbose)
if not sample_data:
if verbose:
print("[Data Loader] Caching computed matrices ...")
scipy.sparse.save_npz(D_filepath, D)
scipy.sparse.save_npz(V_filepath, V)
if "D" in to_return:
out.append(D)
if "V" in to_return:
out.append(V)
if sample_idx is not None:
out.append(sample_idx)
if len(out) == 1:
return out[0]
else:
return out