Raw File
step2a_data_quality-head_position.py
#!/usr/bin/env python3

"""
@ Lina Teichmann

    INPUTS: 
    call from command line with following inputs: 
        -bids_dir

    OUTPUTS:
    Plots for head motion data.
    If it doesn't exist, the script makes a figures folder in the BIDS derivatives folder
    

    NOTES: 
    This script is using the pyctf toolbox to extract electrode positions from the three sensors (nasion, left, right) that were mounted to the participants head.
    (the pyctf toolbox can be downloaded from the nih-megcore github: https://github.com/nih-megcore/pyctf)
    
    Once the electrode positions are extracted over time, we calculated the distance between the measured head positions.  We then report the average distance within the session and across the session. 
    Note that extra care should be taken when the head coil measurements are being used for source localization: it seems that for participant #4 there are a few sessions where the position recording mal-functioned or where the coil wasn't attached to the same position.  


"""

from pyctf import dsopen
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import numpy as np
import pandas as pd
import itertools, os, sys
import seaborn as sns

#*****************************#
### PARAMETERS ###
#*****************************#
n_participants              = 4
n_runs                      = 10
n_sessions                  = 12
colors                      = ['mediumseagreen','steelblue','goldenrod','indianred','grey']
electrodes                  = ['nas','lpa','rpa']
electrode_labels            = ['Nasion','LPA','RPA']
ppt_labels                  = ['M1','M2','M3','M4']
plt.rcParams['font.size']   = '16'
plt.rcParams['font.family'] = 'Helvetica'


#*****************************#
### HELPER FUNCTIONS ###
#*****************************#

def lighten_color(color, amount=0.5):
    import matplotlib.colors as mc
    import colorsys
    try:
        c = mc.cnames[color]
    except:
        c = color
    c = colorsys.rgb_to_hls(*mc.to_rgb(c))
    return colorsys.hls_to_rgb(c[0], 1 - amount * (1 - c[1]), c[2])


def load_sensor_positions(rootdir, recording_dir = ['x','y','z'], n_participants = 4, n_session = 12, n_runs = 10):
    # un-usable recordings of head coil position based on notes by experimenter (each row is a participant, each tuple is (session, run))
    invalid_measures = [[],
        [(8,3)],
        [],
        [(4,4),(5,2),(7,7),(7,8),(7,9),(12,5),(12,10)]]

    # initialize data frame
    df                  = pd.DataFrame(columns=['participant','session','run']+[i + '_' + ii for i in electrodes for ii in recording_dir])
    df.participant      = np.repeat(np.arange(1,n_participants+1),n_sessions*n_runs)
    df.session          = np.tile(np.repeat(np.arange(1,n_sessions+1),n_runs),n_participants)
    df.run              = np.tile(np.arange(1,n_runs+1),n_sessions*n_participants)

    for p in range(1,n_participants+1):
        for s in range(1,n_sessions+1):
            for r in range(1,n_runs+1):
                meg_fn = f'{rootdir}/sub-BIGMEG{str(p)}/ses-{str(s).zfill(2)}/meg/sub-BIGMEG{str(p)}_ses-{str(s).zfill(2)}_task-main_run-{str(r).zfill(2)}_meg.ds'
                for i,v in enumerate(electrodes):
                    filter_col = [col for col in df if col.startswith(v)]
                    df.loc[(df.participant==p)&(df.session==s)&(df.run==r),filter_col]  = dsopen(meg_fn).dewar[i]

        # deleting all entries from invalid measurements
        for ii in invalid_measures[p-1]:
            for i,v in enumerate(electrodes):
                filter_col = [col for col in df if col.startswith(v)]
                df.loc[(df['participant']==p) & (df['session']==ii[0]) & (df['run']==ii[1]),filter_col] = np.nan

    return df


def plot_rdm(res,ax,cbar, fig):
    im                  = ax.imshow(res,interpolation='none', cmap='flare',aspect='equal',vmin=0,vmax=5)
    major_ticks         = np.arange(-.5, len(res)-1, 10)
    minor_ticks         = np.arange(-.5, len(res)-1)
    ax.tick_params(axis = 'both', which = 'major', labelsize = 10)
    ax.tick_params(axis = 'both', which = 'minor', labelsize = 0)
    ax.set_xticks(major_ticks)
    ax.set_xticks(minor_ticks, minor = True)
    ax.set_yticks(major_ticks)
    ax.set_yticks(minor_ticks, minor = True)

    ax.set_xticklabels(['S'+ str(i) for i in range(1,n_sessions+1)],rotation=45)
    ax.set_yticklabels(['S'+ str(i) for i in range(1,n_sessions+1)])
    ax.grid(which = 'major', alpha = 0.9, color='w')
    ax.grid(which = 'minor', alpha = 0.2, color='w')
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)

    if cbar: 
        cbar_ax         = fig.add_axes([0.92, 0.4, 0.01, 0.2])
        cbar            = fig.colorbar(im, cax=cbar_ax)
        cbar.set_label('Distance (mm)', fontsize=16)


def make_supplementary_plot(df):
    fig, ax             = plt.subplots(n_participants,len(electrodes),figsize=(10,20))
    for p in range(1,n_participants+1):
        tmp             = df.loc[df.participant==p,:].copy()
        tmp['id']       = ['S' + str(tmp.session.to_list()[i]) + '_' + 'R' + str(tmp.run.to_list()[i]) for i in range(len(tmp))]
        # get all possible pairwise comparisons
        combs           = list(itertools.combinations(tmp['id'].to_list(), 2))
        res             = np.zeros((len(tmp),len(tmp)))
        res[res == 0.0] = np.nan
        res             = pd.DataFrame(res,columns = tmp['id'].to_numpy(),index = tmp['id'].to_numpy())
        
        # loop over electrodes and calculate distances and plot
        for i,v in enumerate(electrodes):
            filter_col  = [col for col in df if col.startswith(v)]
            res1        = res.copy()
            for vv in combs:
                res1.loc[vv[1],vv[0]]=np.sqrt(((tmp.loc[tmp.id==vv[0],filter_col].to_numpy()-tmp.loc[tmp.id==vv[1],filter_col].to_numpy())**2).sum())

            if (p==4) & (i==2):
                plot_rdm(res1,ax[p-1][i],True,fig) # plot with colorbar
            else:
                plot_rdm(res1,ax[p-1][i],False,fig) # plot without colorbar

    # label the rows and columns
    for a, col in zip(ax[0], electrode_labels):
        a.annotate(col, xy=(0.5, 1), xytext=(0, 5),
                    xycoords='axes fraction', textcoords='offset points',
                    size='large', ha='center', va='baseline')

    for a, row in zip(ax[:,0], ppt_labels):
        a.annotate(row, xy=(0, 0.5), xytext=(-a.yaxis.labelpad - 5, 0),
                    xycoords=a.yaxis.label, textcoords='offset points',
                    size='large', ha='right', va='center')

    fig.subplots_adjust(left=0.15, top=0.95,right=0.9,hspace=0)

    # save
    fig.savefig(figdir + '/supplementary_motion.pdf')

    return res, res1

def make_boxplot(df,res, res1):
    # initialize
    cross_all                       = np.zeros((n_sessions*n_runs*n_sessions*n_runs,n_participants))
    within_all                      = np.zeros((n_sessions*n_runs*n_sessions*n_runs,n_participants))

    # make masks to filter the distance matrix to extract within and cross-session differences

    within_mask                     = ~res.copy().isna()
    cross_mask                      = ~res.copy().isna()
    for s in range(1,n_sessions+1):
        filter_row                  = [col for col in res1 if col.startswith('S'+str(s)+'_')]
        within_mask.loc[filter_row,filter_row] = True

        filter_col                  = [col for col in res1 if not col.startswith('S'+str(s)+'_')]
        cross_mask.loc[filter_row,filter_col] = True
        cross_mask.loc[filter_col,filter_row] = True

    # loop over participants and calculate the distances for cross- and within-session comparisons
    for p in range(1,n_participants+1):
        tmp                         = df.loc[df.participant==p,:].copy()
        # average three sensors to find midpoint
        for i,v in enumerate(['x','y','z']):
            filter_col              = [col for col in tmp if col.endswith(v)]
            tmp[v]                  = tmp[filter_col].mean(axis=1)
        # label the sessions/runs
        tmp['id']                   = ['S' + str(tmp.session.to_list()[i]) + '_' + 'R' + str(tmp.run.to_list()[i]) for i in range(len(tmp))]
        # find all combinations between all measurement pairs and make a matrix that has all pairwise distances
        combs                       = list(itertools.combinations(tmp['id'].to_list(), 2))
        res                         = np.zeros((len(tmp),len(tmp)))
        res[res == 0.0]             = np.nan
        res                         = pd.DataFrame(res,columns = tmp['id'].to_numpy(),index = tmp['id'].to_numpy())
        res1                        = res.copy()
        for vv in combs:
            res1.loc[vv[1],vv[0]]   = np.sqrt(((tmp.loc[tmp.id==vv[0],['x','y','z']].to_numpy()-tmp.loc[tmp.id==vv[1],['x','y','z']].to_numpy())**2).sum())

        # use the mask to extract the cross-session and within-session distances
        cross_all[:,p-1]            = (res1[cross_mask].to_numpy()*10).ravel()  
        within_all[:,p-1]           = (res1[within_mask].to_numpy()*10).ravel()

    # make the boxplot with cross- and within-session distances
    fig, ax                 = plt.subplots(1,1)
    for p in range(n_participants):
            x               = within_all[:,p]
            boxplot         = ax.boxplot(x[~np.isnan(x)],sym='',whis=(0,90),notch=True,patch_artist=True,widths=0.25,positions = [p-0.15],
                                    boxprops=dict(facecolor=(colors[p]), color='k'),
                                    medianprops=dict(color='k',lw=1))

            x               = cross_all[:,p]
            boxplot         = ax.boxplot(x[~np.isnan(x)],sym='',whis=(0,90),notch=True,patch_artist=True,widths=0.25,positions = [p+0.15],
                                    boxprops=dict(facecolor=lighten_color(colors[p],amount=0.3), color='k'),
                                    medianprops=dict(color='k',lw=1))

    # make plot look pretty
    ax.set_xticks(np.arange(n_participants))
    ax.set_xticklabels(['M' + str(p+1) for p in np.arange(n_participants)])

    caps                    = boxplot['caps']
    med                     = boxplot['medians'][0]
    xpos                    = med.get_xdata()
    xoff                    = 0.10 * (xpos[1] - xpos[0])
    xlabel                  = xpos[1] + xoff
    capbottom               = caps[0].get_ydata()[0]
    captop                  = caps[1].get_ydata()[0]

    ax.text(xlabel, capbottom,
            '5th percentile', va='center')
    ax.text(xlabel, captop,
            '95th percentile', va='center')

    ax.set_ylabel('Head coil movement (mm)')
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)

    within_patch            = Patch(facecolor=[0.5,0.5,0.5])
    cross_patch             = Patch(facecolor=lighten_color([0.5,0.5,0.5],amount=0.3))

    ax.legend([within_patch,cross_patch],['within-session ','cross-session'],frameon=False,loc='upper left')
    ax.set_ylim([-0.1,10])

    # save
    fig.subplots_adjust(right=0.8)
    fig.savefig(figdir+'/data_quality-motion-box.pdf')


#*****************************#
### COMMAND LINE INPUTS ###
#*****************************#
if __name__=='__main__':
    import argparse
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "-bids_dir",
        required=True,
        help='path to bids root',
    )


    args = parser.parse_args()
    rootdir                     = args.bids_dir
    sourcedata_dir              = f'{rootdir}/sourcedata/'

    figdir                      = f'{rootdir}/derivatives/figures/'
    if not os.path.exists(figdir):
        os.makedirs(figdir)

    ####### Run ########
    df = load_sensor_positions(rootdir, recording_dir = ['x','y','z'], n_participants = 4, n_session = 12, n_runs = 10)
    res, res1 = make_supplementary_plot(df)
    make_boxplot(df,res, res1)
back to top