Revision a644dc088b87b8668168363d03cb83a1f3ddf363 authored by Joern Diedrichsen on 06 December 2022, 16:01:24 UTC, committed by Joern Diedrichsen on 06 December 2022, 16:01:24 UTC
1 parent aec356e
Raw File
model.py
from operator import index
import os
import time
import numpy as np
import pandas as pd
# import quadprog as qp
# import cvxopt
from scipy import sparse
from sklearn.base import BaseEstimator
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn.linear_model import Lasso
from sklearn.linear_model import LassoCV
from sklearn.linear_model import ElasticNet
from sklearn.decomposition import PCA
from sklearn.cross_decomposition import PLSRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.feature_selection import RFE
from sklearn.feature_selection import RFECV
from sklearn.base import clone
from scipy import stats
import connectivity.evaluation as ev # will be used in stepwise regression


"""
connectivity models
A connectivity model is inherited from the sklearn class BaseEstimator
such that Ridge, Lasso, ElasticNet and other models can
be easily used.

@authors: Maedbh King, Ladan Shahshahani, Jörn Diedrichsen
"""


class ModelMixin:
    """
    This is a class that can give use extra behaviors or functions that we want our connectivity models to have - over an above the basic functionality provided by the stanard SK-learn BaseEstimator classes
    As an example here is a function that serializes the fitted model
    Not used right now, but maybe potentially useful. Note that Mixin classes do not have Constructor!
    """

    def to_dict(self):
        data = {"coef_": self.coef_}
        return data

class L2regression(Ridge, ModelMixin):
    """
    L2 regularized connectivity model
    simple wrapper for Ridge. It performs scaling by stdev, but not by mean before fitting and prediction
    """

    def __init__(self, alpha=1):
        """
        Simply calls the superordinate construction - but does not fit intercept, as this is tightly controlled in Dataset.get_data()
        """
        super().__init__(alpha=alpha, fit_intercept=False)

    def fit(self, X, Y):
        self.scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return super().fit(Xs, Y)

    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)

class LASSO(Lasso, ModelMixin):
    """
    L2 regularized connectivity model
    simple wrapper for Ridge. It performs scaling by stdev, but not by mean before fitting and prediction
    """

    def __init__(self, alpha=1):
        """
        Simply calls the superordinate construction - but does not fit intercept, as this is tightly controlled in Dataset.get_data()
        """
        super().__init__(alpha=alpha, fit_intercept=False)

    def fit(self, X, Y):
        self.scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return super().fit(Xs, Y)

    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)

class WTA_OLD(LinearRegression, ModelMixin):
    """
    WTA model
    It performs scaling by stdev, but not by mean before fitting and prediction
    """

    def __init__(self, positive=False):
        """
        Simply calls the superordinate construction - but does not fit intercept, as this is tightly controlled in Dataset.get_data()
        """
        super().__init__(positive=positive, fit_intercept=False)

    def fit(self, X, Y):
        self.scale_ = np.sqrt(np.sum(X ** 2, 0) / X.shape[0])
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        super().fit(Xs, Y)
        self.labels = np.argmax(self.coef_, axis=1)
        wta_coef_ = np.amax(self.coef_, axis=1)
        self.coef_ = np.zeros((self.coef_.shape))
        num_vox = self.coef_.shape[0]
        # for v in range(num_vox):
        #     self.coef_[v, self.labels[v]] = wta_coef_[v]
        self.coef_[np.arange(num_vox), self.labels] = wta_coef_
        self.labels = self.labels + 1 # we don't want zero-indexed label
        return self.coef_, self.labels

    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)

class WTA(BaseEstimator, ModelMixin):
    """
    WTA model
    It performs scaling by stdev, but not by mean before fitting and prediction
    """

    def __init__(self):
        """
        Simply calls the superordinate construction - but does not fit intercept, as this is tightly controlled in Dataset.get_data()
        """
        super().__init__()

    def fit(self, X, Y):
        self.scale_ = np.sqrt(np.sum(X ** 2, 0) / X.shape[0])
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        self.coef_ = Y.T @ Xs  # This is the correlation (non-standardized)
        self.labels = np.argmax(self.coef_, axis=1)
        wta_coef_ = np.amax(self.coef_, axis=1)
        self.coef_ = np.zeros((self.coef_.shape))
        num_vox = self.coef_.shape[0]
        self.coef_[np.arange(num_vox), self.labels] = wta_coef_
        self.labels = self.labels + 1 # we don't want zero-indexed label
        return self.coef_, self.labels

    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)


class NNLS(BaseEstimator, ModelMixin):
    """
    Fast implementation of a multivariate Non-negative least squares (NNLS) regression
    Allows for both L2 and L1 penality on regression coefficients (i.e. Elastic-net like).
    Regression model is transformed into a quadratic programming problem and then solved
    using the  quadprog module
    """

    def __init__(self, alpha=0, gamma=0, solver = "cvxopt"):
        """
        Constructor. Input:
            alpha (double):
                L2-regularisation
            gamma (double):
                L1-regularisation (0 def)
            solver
                Library for solving quadratic programming problem
        """
        self.alpha = alpha
        self.gamma = gamma
        self.solver = solver

    def fit(self, X, Y):
        """
        Fitting of NNLS model including scaling of X matrix
        """
        N, P1 = X.shape
        P2 = Y.shape[1]
        self.scale_ = np.sqrt(np.sum(X ** 2, 0) / X.shape[0])
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        G = Xs.T @ Xs + np.eye(P1) * self.alpha
        a = Xs.T @ Y - self.gamma
        C = np.eye(P1)
        b = np.zeros((P1,))
        self.coef_ = np.zeros((P2, P1))
        if (self.solver=="quadprog"):
            for i in range(P2):
                self.coef_[i, :] = qp.solve_qp(G, a[:, i], C, b, 0)[0]
        elif (self.solver=="cvxopt"):
            Gc = cvxopt.matrix(G)
            Cc = cvxopt.matrix(-1*C)
            bc = cvxopt.matrix(b)
            inVa = cvxopt.matrix(np.zeros((P1,)))
            for i in range(P2):
                ac = cvxopt.matrix(-a[:,i])
                sol = cvxopt.solvers.qp(Gc,ac,Cc,bc,initvals=inVa)
                self.coef_[i, :] = np.array(sol['x']).reshape((P1,))
                inVa = sol['x']


    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T

class PLSRegress(PLSRegression, ModelMixin):
    """
        PLS regression connectivity model
        for more info:
            https://ogrisel.github.io/scikit-learn.org/sklearn-tutorial/modules/generated/sklearn.pls.PLSRegression.html
            from sklearn.pls import PLSCanonical, PLSRegression, CCA
            https://scikit-learn.org/stable/modules/generated/sklearn.cross_decomposition.PLSRegression.html
            pls2_mod = PLSRegression(n_components = N, algorithm = method)
    """

    def __init__(self, n_components = 1):
        super().__init__(n_components =n_components)

    def fit(self, X, Y):
        """
        uses nipals algorithm
        """

        Xs = X / np.sqrt(np.sum(X**2,0)/X.shape[0]) # Control scaling

        Xs = np.nan_to_num(Xs)
        return super().fit(Xs,Y)

    def predict(self, X):
        Xs = X / np.sqrt(np.sum(X**2,0)/X.shape[0]) # Control scaling
        Xs = np.nan_to_num(Xs)
        return super().predict(Xs)

class WINNERS(ModelMixin):

    def __init__(self, n_features_to_select = 1):
        self.n_featrues_to_select = n_features_to_select

    def add_features(self, X, y, selected = []):

        """
        1. start with evaluation of individual features
        2. select the one feature that results in the best performance
            ** what is the best? That depends on the selected evaluation criteria (in this case it can be R)
        3. Consider all the possible combinations of the selected feature and another feature and select the best combination
        4. Repeat 1 to 3 untill you have the desired number of features

        Args:
        X(np.ndarray)   -    design matrix
        Y(np.ndarray)   -    response variables
        n(int)          -    number of features to select
        """
        remaining = list(set(range(X.shape[1])) - set(selected)) #list containing features that are to be examined

        # 2. loop over features
        while (remaining) and (len(selected) < self.n_featrues_to_select): # while remaining is not empty and n features are not selected

            scores = pd.Series(np.empty((len(remaining))), index=remaining) # the scores will be stored in this
            for i in remaining:

                candidates = selected +[i] # list containing the current features that will be used in regression
                # fit the model
                ## get the features from X
                X_feat = X[:, list(candidates)]

                ## scale X_feat
                scale_ = np.sqrt(np.nansum(X_feat ** 2, 0) / X_feat.shape[0])
                Xs = X_feat / scale_
                Xs = np.nan_to_num(Xs) # there are 0 values after scaling

                ## fit the model
                ### 1. get regression coefficient
                B = (np.linalg.inv(X_feat.T@X_feat))@X_feat.T@y
                ### 2. get predicted values
                y_pred = X_feat@B
                # R = y - X_feat@B
                # mod = LinearRegression(fit_intercept=False).fit(Xs, y)

                ## get the score and put it in scores
                ## get the score
                score_i, _    = ev.calculate_R(y, y_pred)
                # print(score_i)
                scores.loc[i] = score_i


            # find the feature/feature combination with the best score
            best       = scores.idxmax()
            selected.append(best)

            # update remaining
            ## remove the selected feature/features from remaining
            remaining.remove(best)

        return selected

    def set_support_(self, X, Y, support_ = None):
        """
        gets the support (and updates it) for all the voxels
        support_ can then be used to get the selected features
        Ars:
            X(ndarray)          : contains regressors (cortical regions)
            Y(ndarray)          : contains responses (cerebellar voxels)
            support_(ndarray)   : initial mask to select features. None: starts from scratch with all zeros
        """

        if support_ is None:
            # starting from scratch
            self.support_= np.zeros((Y.shape[1], X.shape[1]))
        else:
            self.support_ = support_

        # loop over voxels
        for vox in range(Y.shape[1]):
            # print(vox, end = "", flush=True)

            if np.any(Y[:, vox]):
                # get the selected features for the current voxel
                initial_feats = list(np.where(self.support_[vox, :] == 1)[0])

                # add features to the selected set
                feats = self.add_features(X, Y[:, vox], selected = initial_feats)

                # update support
                self.support_[vox, feats] = int(1)

        return self.support_

class WNTA(Ridge, ModelMixin):

    def __init__(self, winner_model = None, alpha = 0, n_features_to_select = 1):
        """
        should be initialized with an instance of WINNERS class.
        if None is entered, it will start from scratch, create an instance of WINNERS
        and get the support_ for selecting features. Otherwise, It uses the support_ attribute
        of the WINNERS class
        """

        super(Ridge, self).__init__(fit_intercept=False, alpha = alpha)

        if winner_model is None:
            # initialize a winner model class
            self.winner_model = WINNERS(n_features_to_select = n_features_to_select)
        else:
            self.winner_model = winner_model


        self.n_features_to_select = n_features_to_select

    def fit(self, X, Y):

        # get the scaling
        self.scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])

        # first get the winners
        if hasattr(self.winner_model, "support_"): # if it has support_ then it's already been done
            self.winner_model.n_featrues_to_select = self.n_features_to_select # update the number of features to be selected
            self.feature_mask = self.winner_model.set_support_(X, Y, self.winner_model.support_)
        else: # then it hasn't been done, so do it
            self.feature_mask = self.winner_model.set_support_(X, Y)

        # loop over voxels and fit ridge
        wnta_coef = np.zeros((Y.shape[1], X.shape[1]))
        for vox in range(Y.shape[1]):

            if np.any(Y[:, vox]):
                # get the selected features for the current voxel
                selected_vox = list(np.where(self.feature_mask[vox, :] == 1)[0])

                ## use the selected featuers to do a ridge regression
                X_selected = X[:, selected_vox]

                ### scale X_selected
                scale_ = np.sqrt(np.nansum(X_selected ** 2, 0) / X_selected.shape[0])
                Xs = X_selected / scale_
                Xs = np.nan_to_num(Xs) # there are 0 values after scaling

                # print(f"doing ridge regression")
                super(Ridge, self).fit(Xs, Y[:, vox])

                # fill in the elements of the coef
                wnta_coef[vox, selected_vox] = self.coef_

        # set the coef_ attribute
        self.coef_ = wnta_coef

    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)
back to top