from dppy.finite_dpps import FiniteDPP
import abc
from collections import namedtuple
from typing import Sequence
import numpy as np
XYPair = namedtuple("XYPair", ["X", "y"])
def get_sampler_class(sampler_type="dpp"):
if sampler_type=="dpp":
return DPPYSampler
elif sampler_type=="deter-dpp":
return DeterministicDPPSampler
raise ValueError(
'sampler_type must be "dpp" or "deter-dpp" '
class Sampler(abc.ABC):
def __init__(self,
self.verbose = verbose
self.datasets = None
self.parallel = False
self.logging_depth = 0
self.concat = concat
self.normalized = normalized
if X is not None:, y)
elif y is not None:
raise ValueError("Must provide X if any of the following arguments are provided: {y}")
def fit(self, X=None, y=None):
if X is None or np.shape(X) == 1:
raise ValueError("X is not valid")
# Case 1: If X is tuple of (X, y), convert to X and y
if isinstance(X, tuple):
if len(X) != 2:
raise ValueError("Please provide a tuple of (X, y)")
X, y = X
# Case 2: If X is list of (Xi, yi) tuples, convert to Case #4
if isinstance(X[0], tuple):
if len(X[0]) != 2:
raise ValueError("Please provide a list of (X, y) tuples")
X, y = zip(*X)
# Now we can safely convert to np arrays
X = np.array(X)
y = None if y is None else np.array(y)
# Case 3: X is just a singular X
if len(X.shape) == 2 and y:
X = np.expand_dims(X, [0])
y = np.expand_dims(y, [0])
self.datasets = [XYPair(X[0], y[0])]
elif len(X.shape) == 2 and not y:
X = np.expand_dims(X, [0])
self.datasets = [X[0]]
# Case 4: X is a list of Xi's
if len(X.shape) == 3:
self.parallel = True
if y is None:
self.datasets = [XYPair(Xi, None) for Xi in X]
if y.shape[:2] != X.shape[:2]: # Can't zip them together
raise ValueError(f"y:{y.shape} is not aligned with X:{X.shape}")
self.datasets = [XYPair(Xi, yi) for Xi, yi in zip(X, y)]
# Check if first Xi is empty
if self.d == 0 or self.n == 0:
self.datasets = None
raise ValueError("Found an empty X")
# batched_data = []
for Xi, yi in self.datasets:
# All Xi's should have the same dimensions
if Xi.shape != (self.n, self.d):
self.datasets = None
raise ValueError(f"Not all X's have the same dimension (all should have dimension"
f"{(self.n, self.d)}, but found one with dimension {Xi.shape})")
# All X-y pairs should be same length
if yi is not None and len(Xi) != len(yi):
self.datasets = None
raise ValueError("(X, y) pairs malformed")
if y is not None and self.concat:
self.full_data = np.array([np.concatenate([X, np.expand_dims(y, axis=len(X.shape)-1)], axis=len(X.shape)-1)])[0]
self.full_data = X
return self
def sketch(self, nsamples=1):
if not self.is_fitted:
raise ValueError(f"need to fit this {str(self.__class__)} first!")
lst_idxs = self.sketch_idxs(nsamples)
for i, idxs in enumerate(lst_idxs):
if self.verbose >= 1:
print(f"Sketch {i}: {self.datasets[i].X.shape} -> {len(idxs)}")
if self.verbose >= 2:
print(f" - Indexes selected: ", idxs)
result = []
for (Xi, yi), idxs in zip(self.datasets, lst_idxs):
if nsamples==1:
if yi is not None:
result.append((Xi[idxs], yi[idxs]))
result_batch = []
for idx in idxs:
if yi is not None:
result_batch.append((Xi[idx], yi[idx]))
if not self.parallel or len(self.datasets)==1:
result = result[0]
return result
def sketch_idxs(self) -> Sequence[Sequence]:
def d(self):
return self.datasets[0].X.shape[-1]
def n(self):
return self.datasets[0].X.shape[0]
def num_datasets(self):
return len(self.datasets)
def is_fitted(self):
return self.datasets is not None
class DPPYSampler(Sampler):
def __init__(self, X=None, y=None, k=None, concat=False, normalized=False, mode='GS', verbose=0):
self.mode = mode
self.k = k
super().__init__(X, y, concat, normalized, verbose)
self.args = {'normalized':normalized,
def fit(self, X, y):
super().fit(X, y)
self.dpps = [FiniteDPP("likelihood", for Xi in self.full_data]
def sketch_idxs(self, nsamples=1):
lst_idxs = []
for dpp in self.dpps:
lst_samples = []
for _ in range(nsamples):
if self.k:
idxs = dpp.sample_exact_k_dpp(size=self.k, mode=self.mode)
idxs = dpp.sample_exact()
if nsamples==1:
lst_idxs = np.array(lst_idxs)
if nsamples>1:
return np.transpose(lst_idxs, (1,0,2))
return lst_idxs
def deterministic_k_dpp(X, k=None):
n, d = X.shape
L = X @ X.T
if not k:
w, v = np.linalg.eigh(L)
v_k = v[:,-k::]
v_k = v_k[:,::-1]
P = (v_k @ v_k.T)/k
c = []
p_0 = (np.linalg.norm(v_k, axis=1)**2)/k
p = p_0.copy()
for _ in range(k):
ci = np.argmax(p)
if ci not in c:
P_cc = P[c][:,c]
P_cc_inv = np.linalg.pinv(P_cc)
for j in range(n):
P_cj = P[c][:,j]
p[j] = p_0[j] - (P_cj.T @ P_cc_inv @ P_cj)
return c
class DeterministicDPPSampler(Sampler):
def __init__(self, X=None, y=None, k=None, concat=False, normalized=False, mode='GS', verbose=0):
self.mode = mode
self.k = k
super().__init__(X, y, concat, normalized, verbose)
self.args = {'normalized':normalized,
def fit(self, X, y):
super().fit(X, y)
def sketch_idxs(self, nsamples=1):
lst_idxs = []
for Xi in self.full_data:
if nsamples==1:
idxs = deterministic_k_dpp(Xi, self.k)
lst_samples = []
mask = np.array([True]*Xi.shape[0])
new_X = Xi.copy()
for i in range(nsamples):
idxs = deterministic_k_dpp(new_X, self.k)
original_idxs = np.argwhere(mask).flatten()[idxs]
mask = np.array([m if (i not in original_idxs) else False for i, m in enumerate(mask)])
new_X = Xi[mask]
lst_idxs = np.array(lst_idxs).reshape(len(self.full_data), nsamples, -1)
lst_idxs = np.transpose(lst_idxs, (1,0,2))
return np.squeeze(lst_idxs)