https://github.com/majdabd/lpfc_gradients-meta-analysis
Raw File
Tip revision: 7ab4efcbc9875b92745b7b1c43864d7352fd3b90 authored by Majd on 08 September 2022, 15:01:58 UTC
add correct file
Tip revision: 7ab4efc
data_utils.py
import datetime
import os
from pathlib import Path
from typing import Callable, Optional, Tuple, Union

import neurolang
import nibabel
import nilearn.datasets
import nilearn.datasets.utils
import nilearn.image
import nilearn.input_data
import numpy as np
import pandas as pd
import scipy.sparse
import nibabel as nib
import matplotlib.pyplot as plt
from sklearn.preprocessing import minmax_scale
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.colors import ListedColormap
from neurolang.frontend.neurosynth_utils import fetch_neurosynth_peak_data, fetch_feature_data, fetch_study_metadata

DATA_DIR = Path(neurolang.__path__[0]) / "data"


DIFUMO_N_COMPONENTS_TO_DOWNLOAD_ID = {
    128: "wjvd5",
    256: "3vrct",
    512: "9b76y",
    1024: "34792",
}


def tal2mni(coords):
    """Convert coordinates from Talairach space to MNI space.
    .. versionchanged:: 0.0.8
        * [ENH] This function was part of `nimare.transforms` in previous versions (0.0.3-0.0.7)
    Parameters
    ----------
    coords : (X, 3) :obj:`numpy.ndarray`
        Coordinates in Talairach space to convert.
        Each row is a coordinate, with three columns.
    Returns
    -------
    coords : (X, 3) :obj:`numpy.ndarray`
        Coordinates in MNI space.
        Each row is a coordinate, with three columns.
    Notes
    -----
    Python version of BrainMap's tal2icbm_other.m.
    This function converts coordinates from Talairach space to MNI
    space (normalized using templates other than those contained
    in SPM and FSL) using the tal2icbm transform developed and
    validated by Jack Lancaster at the Research Imaging Center in
    San Antonio, Texas.
    http://www3.interscience.wiley.com/cgi-bin/abstract/114104479/ABSTRACT
    """

    coords = coords.transpose()
    # Transformation matrices, different for each software package
    icbm_other = np.array(
        [
            [0.9357, 0.0029, -0.0072, -1.0423],
            [-0.0065, 0.9396, -0.0726, -1.3940],
            [0.0103, 0.0752, 0.8967, 3.6475],
            [0.0000, 0.0000, 0.0000, 1.0000],
        ]
    )

    # Invert the transformation matrix
    icbm_other = np.linalg.inv(icbm_other)

    # Apply the transformation matrix
    coords = np.concatenate((coords, np.ones((1, coords.shape[1]))))
    coords = np.dot(icbm_other, coords)

    # Format the output, transpose if necessary
    out_coords = coords[:3, :]

    out_coords = out_coords.transpose()
    return out_coords.astype("int")


def rescale(data):
    """Rescale the data to be within the range [new_min, new_max]"""
    return (data - data.min()) / (data.max() - data.min()) 


def coords_to_voxels(
    coords: np.array,
    mask: nilearn.input_data.NiftiMasker,
) -> np.array:
    affine = mask.affine
    coords = np.atleast_2d(coords)
    coords = np.hstack([coords, np.ones((len(coords), 1))])
    voxels = np.linalg.pinv(affine).dot(coords.T)[:-1].T
    voxels = voxels[(voxels >= 0).all(axis=1)]
    voxels = voxels[(voxels < mask.shape[:3]).all(axis=1)]
    voxels = np.floor(voxels)
    return voxels.astype("int")


def fetch_neurosynth(
    tfidf_threshold: Optional[float] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    features = fetch_feature_data(data_dir="/Users/majdabdallah/Desktop/Work/")
    metadata = fetch_study_metadata(data_dir="/Users/majdabdallah/Desktop/Work/")
    features["study_id"] = metadata["id"].tolist()
    features = features.reset_index()
    term_data = pd.melt(
        features,   
        id_vars="study_id",
        value_name="tfidf",
    )
    term_data=term_data.rename(columns={"variable": "term", "value": "tfidf"})
    if tfidf_threshold is not None:
        term_data = term_data.query("tfidf > {}".format(tfidf_threshold))[
            ["term", "study_id"]
        ]
    else:
        term_data = term_data.query("tfidf > 0")[["term", "tfidf", "study_id"]]
        
    
    activations = fetch_neurosynth_peak_data(data_dir="/Users/majdabdallah/Desktop/Work/")
    mni_peaks = activations.loc[activations.space == "MNI"][
        ["x", "y", "z", "id"]
    ].rename(columns={"id": "study_id"})
    non_mni_peaks = activations.loc[activations.space != "MNI"][
        ["x", "y", "z", "id"]
    ].rename(columns={"id": "study_id"})

    projected= tal2mni(non_mni_peaks[["x", "y", "z"]].values)
    projected_df = pd.DataFrame(
        np.hstack([projected, non_mni_peaks[["study_id"]].values]),
        columns=["x", "y", "z", "study_id"],
        dtype=int,
    )
    peak_data = pd.concat([projected_df, mni_peaks]).astype(int)
    study_ids = peak_data[["study_id"]].drop_duplicates()
    return term_data, peak_data, study_ids


def fetch_neuroquery(
    mask: nilearn.input_data.NiftiMasker,
    data_dir: Path = DATA_DIR,
    tfidf_threshold: Optional[float] = None,
    coord_type: str = "xyz",
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    base_url = "https://github.com/neuroquery/neuroquery_data/"
    tfidf_url = base_url + "tree/main/neuroquery_model/corpus_tfidf.npz"
    coordinates_url = base_url + "tree/main/data/data-neuroquery_version-1_coordinates.tsv.gz"
    feature_names_url = base_url + "tree/main/neuroquery_model/vocabulary.csv"
    study_ids_url = base_url + "tree/main/neuroquery_model/corpus_metadata.csv"
    out_dir = data_dir / "neuroquery"
    opts = {'unzip' : True}
    os.makedirs(out_dir, exist_ok=True)
    (
        tfidf_fn,
        coordinates_fn,
        feature_names_fn,
        study_ids_fn,
    ) = nilearn.datasets.utils._fetch_files(
        out_dir,
        [
            ("corpus_tfidf.npz", tfidf_url, dict()),
            ("data-neuroquery_version-1_coordinates.tsv.gz", coordinates_url, dict()),
            ("vocabulary.csv", feature_names_url, dict()),
            ("corpus_metadata.csv", study_ids_url, dict()),
        ],
    )
    tfidf = np.load(tfidf_fn)
    coordinates = pd.read_csv(coordinates_fn, sep='\t')
    assert coord_type in ("xyz", "ijk")
    coord_cols = list(coord_type)
    to_concat = list()
    for pmid, dfg in coordinates.groupby("id"):
        coords = dfg[["x", "y", "z"]]
        if coord_type == "ijk":
            coords = coords_to_voxels(coords, mask=mask)
        df = pd.DataFrame(coords, columns=coord_cols)
        df["id"] = pmid
        to_concat.append(df[coord_cols + ["id"]])
    coordinates = pd.concat(to_concat)
    peak_reported = coordinates.rename(columns={"id": "study_id"})
    feature_names = pd.read_csv(feature_names_fn, header=None)
    study_ids = pd.read_csv(study_ids_fn, header=None)
    study_ids.rename(columns={0: "study_id"}, inplace=True)
    tfidf = pd.DataFrame(tfidf.todense(), columns=feature_names[0])
    tfidf["study_id"] = study_ids.iloc[:, 0]
    if tfidf_threshold is None:
        term_data = pd.melt(
            tfidf,
            var_name="term",
            id_vars="study_id",
            value_name="tfidf",
        ).query("tfidf > 0")[["term", "tfidf", "study_id"]]
    else:
        term_data = pd.melt(
            tfidf,
            var_name="term",
            id_vars="study_id",
            value_name="tfidf",
        ).query(f"tfidf > {tfidf_threshold}")[["term", "study_id"]]
    return term_data, peak_reported, study_ids


def fetch_difumo_meta(
    data_dir: Path = DATA_DIR,
    n_components: int = 256,
) -> pd.DataFrame:
    out_dir = data_dir / "difumo"
    download_id = DIFUMO_N_COMPONENTS_TO_DOWNLOAD_ID[n_components]
    url = f"https://osf.io/{download_id}/download"
    labels_path = os.path.join(
        str(n_components), f"labels_{n_components}_dictionary.csv"
    )
    files = [
        (labels_path, url, {"uncompress": True}),
    ]
    files = nilearn.datasets.utils._fetch_files(out_dir, files, verbose=2)
    labels = pd.read_csv(files[0])
    return labels


def fetch_difumo(
    mask: nilearn.input_data.NiftiMasker,
    component_filter_fun: Callable = lambda _: True,
    data_dir: Path = DATA_DIR,
    coord_type: str = "xyz",
    n_components: int = 256,
    resolution: str = "3mm"
) -> Tuple[pd.DataFrame, nibabel.Nifti1Image]:
    out_dir = data_dir / "difumo"
    download_id = DIFUMO_N_COMPONENTS_TO_DOWNLOAD_ID[n_components]
    url = f"https://osf.io/{download_id}/download"
    csv_file = os.path.join(
        str(n_components), f"labels_{n_components}_dictionary.csv"
    )
    nifti_file = os.path.join(str(n_components), "%s/maps.nii.gz"%resolution)
    files = [
        (csv_file, url, {"uncompress": True}),
        (nifti_file, url, {"uncompress": True}),
    ]
    files = nilearn.datasets.utils._fetch_files(out_dir, files, verbose=2)
    labels = pd.read_csv(files[0])
    img = nilearn.image.load_img(files[1])
    img = nilearn.image.resample_to_img(
        img,
        target_img=mask,
        interpolation="nearest",
    )
    img_data = img.get_fdata()
    to_concat = list()
    for i, label in enumerate(
        labels.loc[labels.apply(component_filter_fun, axis=1)].Difumo_names
    ):  
        imj = img_data[:, :, :, i]
        coordinates = np.where(imj > 0)
        dataimg = imj[coordinates].flatten()
        if coord_type == "xyz":
            coordinates = nibabel.affines.apply_affine(img.affine, np.vstack(coordinates).T.astype("int"))
            region_data = pd.DataFrame(np.vstack(coordinates), columns=list(coord_type))
        else:
            assert coord_type == "ijk"
            region_data = pd.DataFrame(np.vstack(coordinates).T, columns=list(coord_type))
            
        region_data["region"] = label
        region_data["weights"] = dataimg
        to_concat.append(region_data[["region"] + list(coord_type) + ["weights"]])
    region_voxels = pd.concat(to_concat)
    return region_voxels, labels


def fetch_schaefer(
    mask: nilearn.input_data.NiftiMasker,
    component_filter_fun: Callable = lambda _: True,
    data_dir: Path = DATA_DIR,
    coord_type: str = "xyz",
    n_components: int = 1000,
) -> Tuple[pd.DataFrame, nibabel.Nifti1Image]:
    files = nilearn.datasets.fetch_atlas_schaefer_2018(n_rois=n_components)
    labels = files["labels"]
    img = nib.load(files.maps)
    img = nilearn.image.resample_img(
        img,
        target_affine=mask.affine,
        interpolation="nearest",
    )
    img_data = img.get_fdata()
    to_concat = list()
    for i, label in enumerate(
        labels
    ):  
        coordinates = np.argwhere(img_data[:, :, :] == i)
        if coord_type == "xyz":
            coordinates = nibabel.affines.apply_affine(img.affine, coordinates)
        else:
            assert coord_type == "ijk"
        region_data = pd.DataFrame(coordinates, columns=list(coord_type))
        region_data["label"] = label
        to_concat.append(region_data[["label"] + list(coord_type)])
    region_voxels = pd.concat(to_concat)
    return region_voxels, labels    


def fetch_yeo(
    mask: nilearn.input_data.NiftiMasker,
    component_filter_fun: Callable = lambda _: True,
    data_dir: Path = DATA_DIR,
    coord_type: str = "xyz",
) -> Tuple[pd.DataFrame, nibabel.Nifti1Image]:
    files = nilearn.datasets.fetch_atlas_yeo_2011()["thick_7"]
    labels = ["Visual", "SomatoSensory", "Dorsal Attention", "Ventral Attention", 
              "Limbic", "Executive Control", "Default Mode"]
    img = nib.load(files)
    img = nilearn.image.resample_img(
        img,
        target_affine=mask.affine,
        interpolation="nearest",
    )
    img_data = img.get_fdata()
    to_concat = list()
    for i, label in enumerate(
        labels
    ):  
        if i>0:
            coordinates = np.argwhere(img_data[:, :, :] == i)
            if coord_type == "xyz":
                coordinates = nibabel.affines.apply_affine(img.affine, coordinates)
            else:
                assert coord_type == "ijk"
            region_data = pd.DataFrame(coordinates, columns=list(coord_type))
            region_data["label"] = label
            to_concat.append(region_data[["label"] + list(coord_type)])
        else: 
            continue
    region_voxels = pd.concat(to_concat)
    return region_voxels, labels 


def save_to_hdf(d: pd.DataFrame, dst_path: Path) -> None:
    print(f"saving to {dst_path}")
    with pd.HDFStore(
        dst_path, mode="w", complib="blosc:lz4", complevel=9
    ) as hdf_store:
        hdf_store["data"] = d


def get_exp_dir(exp_name: str):
    module_dir = Path(__file__).parent
    exp_dir = module_dir / exp_name
    if not exp_dir.is_dir():
        raise FileNotFoundError(
            f"Unknown exp: exp dir {exp_dir} does not exist"
        )
    return exp_dir


def load_results(
    exp_name: str,
    out_path: Optional[Union[str, Path]] = None,
) -> pd.DataFrame:
    exp_dir = get_exp_dir(exp_name)
    result_dir = exp_dir / "_results"
    if not result_dir.is_dir():
        raise FileNotFoundError(
            f"Results not available for exp {exp_name}. "
            f"Directory {exp_dir} does not exist"
        )
    cache_paths = list(result_dir.glob(f"{exp_name}-results*.h5"))
    if cache_paths:
        return pd.read_hdf(result_dir / next(iter(cache_paths)), "data")
    if out_path is not None:
        if isinstance(out_path, str):
            out_path = Path(out_path)
    to_concat = list()
    for path in result_dir.glob("*.h5"):
        to_concat.append(pd.read_hdf(result_dir / path, "data"))
    results = pd.concat(to_concat)
    datestr = datetime.date.today().isoformat()
    cache_path = result_dir / f"{exp_name}-results-{datestr}.h5"
    if cache_path.is_file():
        cache_path.unlink()
    save_to_hdf(results, cache_path)
    if out_path is not None:
        save_to_hdf(results, out_path)
    return results


def load_aggregated_results(exp_name: str) -> pd.DataFrame:
    exp_dir = get_exp_dir(exp_name)
    result_dir = exp_dir / "_results"
    if not result_dir.is_dir():
        raise FileNotFoundError(
            f"Results not available for exp {exp_name}. "
            f"Directory {exp_dir} does not exist"
        )
    cache_paths = list(result_dir.glob(f"{exp_name}-aggregated-results*.h5"))
    if not cache_paths:
        raise FileNotFoundError(
            f"Aggregated results not available in {result_dir}"
        )
    print("loading cached aggregated results from")
    print(next(iter(cache_paths)))
    return pd.read_hdf(result_dir / next(iter(cache_paths)), "data")


def save_aggregated_results(exp_name: str, results: pd.DataFrame) -> None:
    exp_dir = get_exp_dir(exp_name)
    result_dir = exp_dir / "_results"
    if not result_dir.is_dir():
        raise FileNotFoundError(f"Directory {exp_dir} does not exist")
    datestr = datetime.date.today().isoformat()
    filename = f"{exp_name}-aggregated-results-{datestr}.h5"
    save_to_hdf(results, result_dir / filename)
    print("saved aggregated results to")
    print(result_dir / filename)


def load_cognitive_terms(filename: str) -> pd.Series:
    if filename is None:
        path = Path(__file__).parent / "cognitive_terms.txt"
    else:
        path = Path(__file__).parent / f"{filename}.txt"
    return pd.read_csv(path, header=None, names=["term"]).drop_duplicates()


# +
from dipy.align.imwarp import SymmetricDiffeomorphicRegistration
from dipy.align.imwarp import DiffeomorphicMap
from dipy.align.metrics import CCMetric

def transform_to_symmetric_template(coords):
    moving_img = image.resample_img(nib.load('mni_icbm152_gm_tal_nlin_asym_09a.nii'), 
                                    np.eye(3)*3, interpolation="nearest")
    
    template_img = image.resample_img(nib.load('mni_icbm152_gm_tal_nlin_sym_09a.nii'), 
                                    np.eye(3)*3, interpolation="nearest")
    
    moving_data = moving_img.get_fdata()
    moving_affine = moving_img.affine
    template_data = template_img.get_fdata()
    template_affine = template_img.affine

    # The mismatch metric
    metric = CCMetric(3)
    # The optimization strategy:
    level_iters = [10, 10, 5]
    # Registration object
    sdr = SymmetricDiffeomorphicRegistration(metric, level_iters)

    mapping = sdr.optimize(static=template_data, moving=moving_data,
                           static_grid2world=template_affine,
                           moving_grid2world=moving_affine)
    to_concat = list()
    for study_id, row in coords.groupby("study_id"):
        ijk_positions = np.round(
            nib.affines.apply_affine(
                np.linalg.pinv(moving_img.affine),
                row[["x", "y", "z"]].values.astype("int"),
            )
        ).astype(int)
        data_obj= np.zeros_like(moving_img.get_fdata())
        data_obj[tuple(ijk_positions.T)] = 1
        warped_moving = mapping.transform(data_obj, interpolation="nearest")
        ijk_warped = np.where(warped_moving==1)        
        xyz_coordinates = nib.affines.apply_affine(template_img.affine, np.c_[ijk_warped])
        df = pd.DataFrame(xyz_coordinates, columns = ["x", "y", "z"])
        df["study_id"] = study_id
        to_concat.append(df)
    sym_coords = pd.concat(to_concat)
    return sym_coords.reset_index(drop=True)


# +
import random
from sklearn.utils import shuffle
def create_coordinate_dataset(foci):
    """Generate study specific foci.
    .. versionadded:: 0.0.4
    Parameters
    ----------
    foci : :obj:`int` or :obj:`list`
        The number of foci to be generated per study or the
        x,y,z coordinates of the ground truth foci.
    foci_percentage : :obj:`float`
        Percentage of studies where the foci appear.
    fwhm : :obj:`float`
        Full width at half maximum (fwhm) to define the probability
        spread of the foci.
    n_studies : :obj:`int`
        Number of n_studies to generate.
    n_noise_foci : :obj:`int`
        Number of foci considered to be noise in each study.
    rng : :class:`numpy.random.RandomState`
        Random state to reproducibly initialize random numbers.
    space : :obj:`str`
        The template space the coordinates are reported in.
    Returns
    -------
    ground_truth_foci : :obj:`list`
        List of 3-item tuples containing x, y, z coordinates
        of the ground truth foci or an empty list if
        there are no ground_truth_foci.
    foci_dict : :obj:`dict`
        Dictionary with keys representing the study, and
        whose values represent the study specific foci.
    """

    template_img = nilearn.image.resample_img(nib.load("MNI152_T1_2mm_brain_mask.nii.gz"), 
                                              np.eye(3)*3, interpolation="nearest")

    template_data = template_img.get_fdata()
    possible_ijks = np.argwhere(template_data)

    num_foci = foci.groupby('study_id').size().tolist()
    ground_truth_foci_ijks = []
    for n in num_foci:
            foci_idxs = np.random.choice(range(len(possible_ijks)), n, replace=False)
            ground_truth_foci_ijks.append(possible_ijks[foci_idxs])
            
    ground_truth_foci_ijks = np.vstack(ground_truth_foci_ijks)
    
      
    ground_truth_foci_xyz = vox2mm(ground_truth_foci_ijks, template_img.affine)

    
    xyz_coords_df = pd.DataFrame(ground_truth_foci_xyz, columns=["x", "y", "z"])
    
    return xyz_coords_df

def mm2vox(xyz, affine):
    """Convert coordinates to matrix subscripts.
    .. versionchanged:: 0.0.8
        * [ENH] This function was part of `nimare.transforms` in previous versions (0.0.3-0.0.7)
    Parameters
    ----------
    xyz : (X, 3) :obj:`numpy.ndarray`
        Coordinates in image-space.
        One row for each coordinate, with three columns: x, y, and z.
    affine : (4, 4) :obj:`numpy.ndarray`
        Affine matrix from image.
    Returns
    -------
    ijk : (X, 3) :obj:`numpy.ndarray`
        Matrix subscripts for coordinates being transformed.
    Notes
    -----
    From here:
    http://blog.chrisgorgolewski.org/2014/12/how-to-convert-between-voxel-and-mm.html
    """
    ijk = nib.affines.apply_affine(np.linalg.inv(affine), xyz).astype(int)
    return ijk

def vox2mm(ijk, affine):
    """Convert matrix subscripts to coordinates.
    .. versionchanged:: 0.0.8
        * [ENH] This function was part of `nimare.transforms` in previous versions (0.0.3-0.0.7)
    Parameters
    ----------
    ijk : (X, 3) :obj:`numpy.ndarray`
        Matrix subscripts for coordinates being transformed.
        One row for each coordinate, with three columns: i, j, and k.
    affine : (4, 4) :obj:`numpy.ndarray`
        Affine matrix from image.
    Returns
    -------
    xyz : (X, 3) :obj:`numpy.ndarray`
        Coordinates in image-space.
    Notes
    -----
    From here:
    http://blog.chrisgorgolewski.org/2014/12/how-to-convert-between-voxel-and-mm.html
    """
    xyz = nib.affines.apply_affine(affine, ijk)
    return xyz


# -

import numpy as np
from matplotlib import cm 
from matplotlib import colors as _colors
from matplotlib import rcParams 
def alpha_cmap(color, name='', alpha_min=0.2, alpha_max=0.5):
    """ Return a colormap with the given color, and alpha going between two values.
    Parameters
    ----------
    color : (r, g, b), or a string
        A triplet of floats ranging from 0 to 1, or a matplotlib
        color string.
    name : string, optional
        Name of the colormap. Default=''.
    alpha_min : Float, optional
        Minimum value for alpha. Default=0.5.
    alpha_max : Float, optional
        Maximum value for alpha. Default=1.0.
    """
    red, green, blue = _colors.colorConverter.to_rgb(color)
    if name == '' and hasattr(color, 'startswith'):
        name = color
    cmapspec = [(red, green, blue, 1.),
                (red, green, blue, 1.),
               ]
    cmap = _colors.LinearSegmentedColormap.from_list(
        '%s_transparent' % name, cmapspec, rcParams['image.lut'])
    cmap._init()
    cmap._lut[:, -1] = np.linspace(alpha_min, alpha_max, cmap._lut.shape[0])
    cmap._lut[-1, -1] = 0
    return cmap


def truncate_colormap(cmap, minval=0.0, maxval=1.0, n=100):
    new_cmap = LinearSegmentedColormap.from_list(
        'trunc({n},{a:.2f},{b:.2f})'.format(n=cmap.name, a=minval, b=maxval),
        cmap(np.linspace(minval, maxval, n)))
    return new_cmap


def merge_difumo_network_label(
    d: pd.DataFrame,
    difumo_meta: pd.DataFrame,
    dst_col: str = "network_yeo17",
    network_col: str = "Yeo_networks17",
) -> pd.DataFrame:
    if network_col not in difumo_meta.columns:
        raise ValueError(f"Unknown network column name: {network_col}")
    meta = difumo_meta[["Difumo_names", network_col]]
    meta = meta.rename(
        columns={"Difumo_names": "region", network_col: dst_col},
    )
    get_network_fn = dict(meta[["region", dst_col]].values).get
    d[dst_col] = d['region'].apply(get_network_fn)
    return d
back to top