https://github.com/PariSolunke/mountaineer
Raw File
Tip revision: 0bd9da3c2974e30705dd4b840c8bfa93c8d63ec2 authored by Parikshit Solunke on 27 June 2024, 20:32:02 UTC
fixed install script
Tip revision: 0bd9da3
synthetic_data.py
#!/usr/bin/env python
"""
Generate synthetic data for a binary classification problem.

Inspired by sklearn.datasets.make_classification.
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_classification.html

Generate tabular data to provide ground truth data for post-hoc explainability of black box models.
With user specified control over:
    1. marginal distribution for features
    2. correlation structure
    3. nonlinearity & interactions (otherwise, why use advanced ML techniques?)
    4. Noise / overlap
    5. categorical features (stretch goal)
    6. outliers (stretch)

    TODO:
        - class_sep - spread out our class clusters
        - add noise to X & y (see e.g. flip_y)
        - add repeated? add redundant?
        - add weights for balanced/unbalanced
        - scale - set min/max?
        - shuffle
"""

import numpy as np
from scipy import stats
from sklearn.preprocessing import MinMaxScaler


def transform_to_distribution(x, adict):
    """
    Input:
        x - input uniform distributed random variable
        adict - dictionary corresponding to the desired distribution (name & params)
        e.g. {'col':[list], 'dist':'normal', 'kwargs':{'loc':0.0, 'scale':1.0, 'size'=n_samples}}
    Output:
        x_samples - the transformed vector with desired distribution
    """

    if "args" not in adict.keys():
        adict["args"] = {}

    if "kwargs" not in adict.keys():
        adict["kwargs"] = {}

    method_gen = getattr(stats, adict["dist"])
    method_specific = method_gen(**adict["args"], **adict["kwargs"])
    x_samples = method_specific.ppf(x)

    return x_samples


def eval_expr_for_sample(x, col_map, expr):
    """
    Inputs:
        x - 1D array with shape = number of symbols
        col_map - dictionary with keys = symbols, values = columns of X
        expr - sympy expression that gives y = f(X)
    Output:
        evaluated expression
    """
    # print(type(x), type(col_map), type(expr))
    # print(x.shape)

    # for a sample, build a dictionary of symbol:value
    my_sub = {}

    for i, key_symbol in enumerate(col_map.keys()):
        my_sub[key_symbol] = x[i]

    # print(my_sub)
    out_value = expr.evalf(subs=my_sub)

    return out_value


def sigmoid(x, k=1.0, x0=None):
    """ sigmoid/logistic function """

    if x0 is None:
        x0 = x.mean()
    sig = 1.0 / (1.0 + np.exp(-k * (x - x0)))

    return sig


# this is an example call from script....
# X, y_reg, y_prob, y_labels = make_tabular_data(
#    n_samples=1000, cov=cov, col_map=col_map, expr=expr, p_thresh=0.5)


def generate_x_noise(X, noise_level_x, seed=None):
    """
    inputs - X (used to determine shape of output matrix)
            noise_level_x : strength of the noise (range 0 to 1)
    outputs:
        x_noise - array with the same dimension as X with gaussian white noise
    """
    n_samples = X.shape[0]
    n_total = X.shape[1]

    # generate covariance matrix - 1's on diagonal - everything else is 0
    cov = np.zeros((n_total, n_total))
    np.fill_diagonal(cov, 1.0)

    # generate our gaussian white noise
    means = np.zeros(n_total)
    mvnorm = stats.multivariate_normal(mean=means, cov=cov)
    x_noise = noise_level_x * mvnorm.rvs(n_samples, random_state=seed)
    # print("in noise x_noise - ", x_noise.shape)

    return x_noise


def resolve_covariant(n_total, covariant=None):
    """Resolves a covariant in the following cases:
        - If a covariant is not provided a diagonal matrix of 1s is generated, and symmetry is checked via a comparison with the datasets transpose
        - If a covariant is provided, the symmetry is checked

    args:
        n_total {int} -- total number of informative features
        covariant {[type]} -- [description] (default: {None})
    returns:
        covariant {np_array}
    """

    if covariant is None:
        print("No covariant provided, generating one.")
        covariant = np.diag(np.ones(n_total))

    # test for symmetry on covariance matrix by comparing the matrix to its transpose
    try:
        assert np.all(covariant == covariant.T)
    except AssertionError:
        print("Assertion error - please check covariance matrix is symmetric.")

    return covariant


def pre_data_generation_checks(n_informative, col_map, n_total):
    """This function is used to ensure input, and input combinations are correct before
    generating synthetic data

    args:
        n_informative {int} -- n_informative - number of informative features - need to appear at least once in expression
        col_map {dict} -- dictionary mapping sympy symbols to columns
        n_total {int} -- total number of samples in the dataset
    """

    try:
        err = "number of dictionary keys in col_map not equal to n_informative."
        assert n_informative == len(col_map)
        err = "total number of samples (n_informative + n_nuisance) must be greater than 0"
        assert n_total > 0
    except AssertionError:
        raise Exception(f"Error - {err}")


def generate_redundant_features(x, n_informative, n_redundant, seed):
    generator = np.random.RandomState(seed)
    B = 2 * generator.rand(n_informative, n_redundant) - 1
    # B = 2 * random_state.rand(n_informative, n_redundant) - 1
    # print("in main script - b")
    # print(B)
    # print("in main script - x")
    # print(x)
    x_redundant = np.dot(x, B)
    # print("in synthetic_data - ")
    # print(x_redundant)

    return x_redundant


def make_tabular_data(
    n_samples=1000,
    n_informative=2,
    n_redundant=0,
    n_nuisance=0,
    n_classes=2,
    dist=[],
    cov=None,
    col_map={},
    expr=None,
    sig_k=1.0,
    sig_x0=None,
    p_thresh=0.5,
    noise_level_x=0.0,
    noise_level_y=0.0,
    seed=None,
):
    """
    Use copulas and marginal distributions to build the joint probability of X.
    args:
        n_samples - number of samples to generate
        n_informative - number of informative features - need to appear at least once in expression
        n_redundant - number of redundant features
        n_nuisance - number of nuiscance features with no signal (noise)
        n_classes - number of classes for labeling (default is binary classification)
        dist - list of dicts for marginal distributions to apply to columns
            each dict specifies column, distribution and dictionary of args & kwds
            suport for distributions available in  scipy stats:
            https://docs.scipy.org/doc/scipy/reference/stats.html
        cov - a symmetric matrix specifying covariance amongst features
        col_map - dictionary mapping sympy symbols to columns
        expr - sympy expression holding y = f(x)
        p_thresh - probability threshold for assigning class labels
        noise_level_x (float) - level of white noise (jitter) added to x
        noise_level_y (float) - level of white noise added to y (think flip_y)
        seed - numpy random state object for repeatability




    returns X: array of shape [n_samples, n_total] where n_total = n_inform + n_nuisance, etc.
            y: array of shape [n_samples] with our labels
            y_reg: array of shape [n_samples] with regression values which get split for labels
    """

    n_total = n_informative + n_redundant + n_nuisance
    x_final = np.zeros((n_samples, n_total))

    pre_data_generation_checks(
        n_informative=n_informative, col_map=col_map, n_total=n_total
    )

    # generate covariance matrix if not handed one
    cov = resolve_covariant(n_informative, covariant=cov)

    # initialize X array
    means = np.zeros(n_informative)
    mvnorm = stats.multivariate_normal(mean=means, cov=cov)
    x = mvnorm.rvs(n_samples, random_state=seed)
    # x_cont = np.zeros_like(x)

    # now tranform marginals back to uniform distribution
    norm = stats.norm()
    x_cont = norm.cdf(x)

    # print("x_cont.shape - ", x.shape)
    # print("x_cont - ")
    # print(x_cont)
    # at this point x_cont has columns with correlation & uniform dist

    # apply marginal distributions

    for a_dist in dist:
        col = a_dist["column"]
        # method = getattr(stats, a_dist["dist"])
        # x_cont[:, col] = method.ppf(x_unif[:, col])
        x_cont[:, col] = transform_to_distribution(x_cont[:, col], a_dist)
    # print(x_cont.max())
    x_final[:, :n_informative] = x_cont

    # add redundant - lines 224-228
    # https://github.com/scikit-learn/scikit-learn/blob/fd237278e/sklearn/datasets/_samples_generator.py#L37

    if n_redundant > 0:
        x_redundant = generate_redundant_features(
            x_cont, n_informative, n_redundant, seed
        )
        x_final[:, n_informative : n_informative + n_redundant] = x_redundant

    if n_nuisance > 0:
#         x_nuis = np.random.rand(n_samples, n_nuisance)
        x_nuis = np.random.normal(1,0.3,size=(n_samples, n_nuisance))
        # x_final = np.concatenate((x_final, x_nuis), axis=1)
        x_final[:, -n_nuisance:] = x_nuis
    #    else:
    #        x_final = x_cont

    # rescale to -1 to 1
    scaler = MinMaxScaler(feature_range=[-1, 1])
    x_final = scaler.fit_transform(x_final)

    # apply expression to each sample of X[mapped_cols,:]
    #    y_reg, y_prob, y_labels = calculate_y(X, p_thresh=0.5)

    # extract the columns we need from X
    col_list = list(col_map.values())
    x_filt = x_final[:, col_list]  # same as [:,:n_informative] - or should be
    y_reg = np.apply_along_axis(eval_expr_for_sample, 1, x_filt, col_map, expr)
    y_reg = y_reg.astype("float32")

    y_prob = sigmoid(y_reg, k=sig_k, x0=sig_x0)
#     y_labels = y_prob >= p_thresh
    y_labels = [1 if x >= p_thresh else 0 for x in y_prob]

    #
    # post processing steps - e.g. add noise
    #

    if noise_level_x > 0.0:
        x_noise = generate_x_noise(x_final[:, :n_informative], noise_level_x, seed=seed)
        x_final[:, :n_informative] = x_final[:, :n_informative] + x_noise

    return x_final, y_reg, y_prob, y_labels

def tuples_to_cov(list_of_tuples, col_map):
    """
    Alternative form for specifying correlation between features, intended
    to be an easier specification, and less error prone (using symbolic names rather than indices)
    Input - list of tuples where each tuple is of the form:
            (sym_i, sym_j, correlation)
    Output - symmetric (n_features x n_features) array of cov
    """

    n_feat = len(col_map.keys())
    cov = np.zeros((n_feat, n_feat))

    for a_tuple in list_of_tuples:
        # map symbols to indices
        i = col_map[a_tuple[0]]
        j = col_map[a_tuple[1]]
        cov[i, j] = cov[j, i] = a_tuple[2]

    for i in range(n_feat):
        cov[i, i] = 1.0

    return cov
back to top