https://github.com/SimonBussy/binacox
Raw File
Tip revision: 6308b71a9969c409afe39dd5129c8f2687a4166e authored by SimonBussy on 17 December 2020, 07:59:46 UTC
biometrics reviews update, including rank tests
Tip revision: 6308b71
binacox.py
import numpy as np
import pandas as pd
import scipy
from scipy.stats import norm
from sklearn.model_selection import KFold
from joblib import Parallel, delayed
from tick.preprocessing.features_binarizer import FeaturesBinarizer
from tick.survival import CoxRegression
from lifelines.utils import concordance_index
import statsmodels.api as sm
import pylab as pl
import warnings
warnings.filterwarnings('ignore')


def compute_score(features, features_binarized, times, censoring,
                  blocks_start, blocks_length, boundaries, C=10, n_folds=10,
                  features_names=None, shuffle=True, n_jobs=1, verbose=False,
                  validation_data=None):
    scores = cross_val_score(features, features_binarized, times,
                             censoring, blocks_start, blocks_length, boundaries,
                             n_folds=n_folds, shuffle=shuffle, C=C,
                             features_names=features_names, n_jobs=n_jobs,
                             verbose=verbose, validation_data=validation_data)
    scores_test = scores[:, 0]
    scores_validation = scores[:, 1]
    if validation_data is not None:
        scores_validation_mean = scores_validation.mean()
        scores_validation_std = scores_validation.std()
    else:
        scores_validation_mean, scores_validation_std = None, None

    scores_mean = scores_test.mean()
    scores_std = scores_test.std()
    if verbose:
        print("\nscore %0.3f (+/- %0.3f)" % (scores_mean, scores_std))
    scores = [scores_mean, scores_std, scores_validation_mean,
              scores_validation_std]
    return scores


def cross_val_score(features, features_binarized, times, censoring,
                    blocks_start, blocks_length, boundaries, n_folds, shuffle,
                    C, features_names, n_jobs, verbose, validation_data):
    cv = KFold(n_splits=n_folds, shuffle=shuffle)
    cv_iter = list(cv.split(features))

    parallel = Parallel(n_jobs=n_jobs, verbose=verbose)
    scores = parallel(
        delayed(fit_and_score)(features, features_binarized, times,
                               censoring, blocks_start, blocks_length,
                               boundaries, features_names, idx_train, idx_test,
                               validation_data, C)
        for (idx_train, idx_test) in cv_iter)
    return np.array(scores)


def fit_and_score(features, features_bin, times, censoring,
                  blocks_start, blocks_length, boundaries, features_names,
                  idx_train, idx_test, validation_data, C):
    if features_names is None:
        features_names = [str(j) for j in range(features.shape[1])]
    X_train, X_test = features_bin[idx_train], features_bin[idx_test]
    Y_train, Y_test = times[idx_train], times[idx_test]
    delta_train, delta_test = censoring[idx_train], censoring[idx_test]

    learner = CoxRegression(penalty='binarsity', tol=1e-5,
                            verbose=False, max_iter=100, step=0.3,
                            blocks_start=blocks_start,
                            blocks_length=blocks_length,
                            warm_start=True)
    learner._solver_obj.linesearch = False
    learner.C = C
    learner.fit(X_train, Y_train, delta_train)
    coeffs = learner.coeffs

    cut_points_estimates = {}
    for j, start in enumerate(blocks_start):
        coeffs_j = coeffs[start:start + blocks_length[j]]
        all_zeros = not np.any(coeffs_j)
        if all_zeros:
            cut_points_estimate_j = np.array([-np.inf, np.inf])
        else:
            groups_j = get_groups(coeffs_j)
            jump_j = np.where(groups_j[1:] - groups_j[:-1] != 0)[0] + 1
            if jump_j.size == 0:
                cut_points_estimate_j = np.array([-np.inf, np.inf])
            else:
                cut_points_estimate_j = boundaries[features_names[j]][
                    jump_j]
                if cut_points_estimate_j[0] != -np.inf:
                    cut_points_estimate_j = np.insert(cut_points_estimate_j,
                                                      0, -np.inf)
                if cut_points_estimate_j[-1] != np.inf:
                    cut_points_estimate_j = np.append(cut_points_estimate_j,
                                                      np.inf)
        cut_points_estimates[features_names[j]] = cut_points_estimate_j
    binarizer = FeaturesBinarizer(method='given',
                                  bins_boundaries=cut_points_estimates)
    binarized_features = binarizer.fit_transform(features)
    blocks_start = binarizer.blocks_start
    blocks_length = binarizer.blocks_length
    X_bin_train = binarized_features[idx_train]
    X_bin_test = binarized_features[idx_test]
    learner_ = CoxRegression(penalty='binarsity', tol=1e-5,
                             verbose=False, max_iter=100, step=0.3,
                             blocks_start=blocks_start,
                             blocks_length=blocks_length,
                             warm_start=True, C=1e10)
    learner_._solver_obj.linesearch = False
    learner_.fit(X_bin_train, Y_train, delta_train)
    score = learner_.score(X_bin_test, Y_test, delta_test)

    if validation_data is not None:
        X_validation = validation_data[0]
        X_bin_validation = binarizer.fit_transform(X_validation)
        Y_validation = validation_data[1]
        delta_validation = validation_data[2]
        score_validation = learner_.score(X_bin_validation, Y_validation,
                                          delta_validation)
    else:
        score_validation = None

    return score, score_validation


def get_groups(coeffs):
    n_coeffs = len(coeffs)
    jumps = np.where(coeffs[1:] - coeffs[:-1] != 0)[0] + 1
    jumps = np.insert(jumps, 0, 0)
    jumps = np.append(jumps, n_coeffs)
    groups = np.zeros(n_coeffs)
    for i in range(len(jumps) - 1):
        groups[jumps[i]:jumps[i + 1]] = i
        if jumps[i + 1] - jumps[i] <= 2:
            if i == 0:
                groups[jumps[i]:jumps[i + 1]] = 1
            elif i == len(jumps) - 2:
                groups[jumps[i]:jumps[i + 1]] = groups[jumps[i - 1]]
            else:
                coeff_value = coeffs[jumps[i]]
                group_before = groups[jumps[i - 1]]
                coeff_value_before = coeffs[
                    np.nonzero(groups == group_before)[0][0]]
                try:
                    k = 0
                    while coeffs[jumps[i + 1] + k] != coeffs[
                                        jumps[i + 1] + k + 1]:
                        k += 1
                    coeff_value_after = coeffs[jumps[i + 1] + k]
                except:
                    coeff_value_after = coeffs[jumps[i + 1]]
                if np.abs(coeff_value_before - coeff_value) < np.abs(
                                coeff_value_after - coeff_value):
                    groups[np.where(groups == i)] = group_before
                else:
                    groups[np.where(groups == i)] = i + 1
    return groups.astype(int)


def get_m_1(cut_points_estimates, cut_points, S):
    m_1, d = 0, 0
    n_features = len(cut_points)
    for j in set(range(n_features)) - set(S):
        mu_star_j = cut_points[str(j)][1:-1]
        hat_mu_star_j = cut_points_estimates[str(j)][1:-1]
        if len(hat_mu_star_j) > 0:
            d += 1
            m_1 += get_H(mu_star_j, hat_mu_star_j)
    if d == 0:
        m_1 = np.nan
    else:
        m_1 *= (1 / d)
    return m_1


def get_H(A, B):
    return max(get_E(A, B), get_E(B, A))


def get_E(A, B):
    return max([min([abs(a - b) for a in A]) for b in B])


def get_m_2(hat_K_star, S):
    return (1 / len(S)) * hat_K_star[S].sum()


def plot_screening(screening_strategy, screening_marker, cancer, P):
    fig = pl.figure()
    ax = fig.add_subplot(111)
    alpha = .8
    lw = 2
    label = 'Selected'
    n_features = len(screening_marker)
    ax.plot(range(P), screening_marker[:P], 'r',
            lw=lw, alpha=alpha, label=label)
    label = 'Rejected'
    ax.plot(range(P, n_features), screening_marker[P:],
            'b', lw=lw, alpha=alpha, label=label)
    pl.legend(fontsize=18)
    pl.xlabel(r'$j$', fontsize=25)
    pl.tick_params(axis='x', which='both', top='off')
    pl.xticks(fontsize=18)
    pl.yticks(fontsize=18)
    pl.title("%s screening on %s" % (screening_strategy, cancer),
             fontsize=20)
    pl.tight_layout()
    pl.show()


def get_p_values_j(feature, mu_k, times, censoring, values_to_test, epsilon):
    if values_to_test is None:
        p1 = np.percentile(feature, epsilon)
        p2 = np.percentile(feature, 100 - epsilon)
        values_to_test = mu_k[np.where((mu_k <= p2) & (mu_k >= p1))]
    p_values, t_values = [], []
    for val in values_to_test:
        feature_bin = feature <= val
        mod = sm.PHReg(endog=times, status=censoring,
                       exog=feature_bin.astype(int), ties="efron")
        fitted_model = mod.fit()
        p_values.append(fitted_model.pvalues[0])
        t_values.append(fitted_model.tvalues[0])
    p_values = pd.DataFrame({'values_to_test': values_to_test,
                             'p_values': p_values,
                             't_values': t_values})
    p_values.sort_values('values_to_test', inplace=True)
    return p_values


def multiple_testing(X, boundaries, Y, delta, values_to_test=None,
                     features_names=None, epsilon=5):
    if values_to_test is None:
        values_to_test = X.shape[1] * [None]
    if features_names is None:
        features_names = [str(j) for j in range(X.shape[1])]
    X = np.array(X)
    result = Parallel(n_jobs=5)(
        delayed(get_p_values_j)(X[:, j],
                                boundaries[features_names[j]].copy()[1:-1], Y,
                                delta, values_to_test[j], epsilon=epsilon)
        for j in range(X.shape[1]))
    return result


def t_ij(i, j, n):
    return (1 - i * (n - j) / ((n - i) * j)) ** .5


def d_ij(i, j, z, n):
    return (2 / np.pi) ** .5 * norm.pdf(z) * (
        t_ij(i, j, n) - (z ** 2 / 4 - 1) * t_ij(i, j, n) ** 3 / 6)


def p_value_cut(p_values, values_to_test, feature, epsilon=5):
    n_tested = p_values.size
    p_value_min = np.min(p_values)
    l = np.zeros(n_tested)
    l[-1] = n_tested
    d = np.zeros(n_tested - 1)
    z = norm.ppf(1 - p_value_min / 2)
    values_to_test_sorted = np.sort(values_to_test)

    epsilon /= 100
    p_corr_1 = norm.pdf(1 - p_value_min / 2) * (z - 1 / z) * np.log(
        (1 - epsilon) ** 2 / epsilon ** 2) + 4 * norm.pdf(z) / z

    for i in np.arange(n_tested - 1):
        l[i] = np.count_nonzero(feature <= values_to_test_sorted[i])
        if i >= 1:
            d[i - 1] = d_ij(l[i - 1], l[i], z, feature.shape[0])
    p_corr_2 = p_value_min + np.sum(d)

    p_value_min_corrected = np.min((p_corr_1, p_corr_2, 1))
    if np.isnan(p_value_min_corrected) or np.isinf(p_value_min_corrected):
        p_value_min_corrected = p_value_min
    return p_value_min_corrected


def multiple_testing_perm(n_samples, X, boundaries, Y, delta, values_to_test_init,
                     features_names, epsilon):
    np.random.seed()
    perm = np.random.choice(n_samples, size=n_samples, replace=True)
    multiple_testing_rslt = multiple_testing(X[perm], boundaries, Y[perm],
                                   delta[perm], values_to_test_init,
                                   features_names=features_names,
                                   epsilon=epsilon)
    return multiple_testing_rslt


def bootstrap_cut_max_t(X, boundaries, Y, delta, multiple_testing_rslt, B=10,
                        features_names=None, epsilon=5):
    if features_names is None:
        features_names = [str(j) for j in range(X.shape[1])]
    n_samples, n_features = X.shape
    t_values_init, values_to_test_init, t_values_B = [], [], []
    for j in range(n_features):
        t_values_init.append(multiple_testing_rslt[j].t_values)
        values_to_test_j = multiple_testing_rslt[j].values_to_test
        values_to_test_init.append(values_to_test_j)
        n_tested_j = values_to_test_j.size
        t_values_B.append(pd.DataFrame(np.zeros((B, n_tested_j))))

    result = Parallel(n_jobs=10)(
        delayed(multiple_testing_perm)(n_samples, X, boundaries, Y, delta,
                                  values_to_test_init, features_names, epsilon)
        for _ in np.arange(B))

    for b in np.arange(B):
        for j in range(n_features):
            t_values_B[j].ix[b, :] = result[b][j].t_values

    adjusted_p_values = []
    for j in range(n_features):
        sd = t_values_B[j].std(0)
        sd[sd < 1] = 1
        mean = t_values_B[j].mean(0)
        t_val_B_H0_j = (t_values_B[j] - mean) / sd
        maxT_j = t_val_B_H0_j.abs().max(1)
        adjusted_p_values.append(
            [(maxT_j > np.abs(t_k)).mean() for t_k in t_values_init[j]])
    return adjusted_p_values


def refit_and_predict(cut_points_estimates, X_train, X_test, Y_train,
                      delta_train, Y_test, delta_test):

    binarizer = FeaturesBinarizer(method='given',
                                  bins_boundaries=cut_points_estimates,
                                  remove_first=True)
    binarizer.fit(pd.concat([X_train, X_test]))
    X_bin_train = binarizer.transform(X_train)
    X_bin_test = binarizer.transform(X_test)

    learner = CoxRegression(penalty='none', tol=1e-5,
                            solver='agd', verbose=False,
                            max_iter=100, step=0.3,
                            warm_start=True)
    learner._solver_obj.linesearch = False
    learner.fit(X_bin_train, Y_train, delta_train)
    coeffs = learner.coeffs
    marker = X_bin_test.dot(coeffs)
    lp_train = X_bin_train.dot(coeffs)
    c_index = concordance_index(Y_test, marker, delta_test)
    c_index = max(c_index, 1 - c_index)

    return c_index, marker, lp_train
back to top