Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://gitlab.com/tgwgraham/papacode_v2
30 September 2022, 14:34:50 UTC
  • Code
  • Branches (2)
  • Releases (0)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/main
    • refs/heads/tgwgraham-main-patch-61314
    • 77dbcca3d4ca2833a5d051d495d52444ba34ac27
    No releases to show
  • dd6f73e
  • /
  • papa_utils.py
Raw File Download
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge Iframe embedding
swh:1:cnt:41257603aecf32b6a7ba42694b6be778578f6dc4
origin badgedirectory badge Iframe embedding
swh:1:dir:dd6f73e18369c2fc022f6882ec50b4852636816a
origin badgerevision badge
swh:1:rev:77dbcca3d4ca2833a5d051d495d52444ba34ac27
origin badgesnapshot badge
swh:1:snp:dac718c98b0b3a3d1be0a1317e553dd28df4d361
Citations

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: 77dbcca3d4ca2833a5d051d495d52444ba34ac27 authored by Thomas Graham on 29 September 2022, 16:03:20 UTC
Add LICENSE
Tip revision: 77dbcca
papa_utils.py
"""
papa_utils.py -- utilities for PAPA-SPT analysis

"""

import os

# Read config files in TOML format
import toml 
import pandas as pd
import numpy as np

def read_config(path):
    """
    Read a config file.

    args
        path    :   str

    returns
        dict

    """
    assert os.path.isfile(path), \
        "read_config: path %s does not exist" % path
    return toml.load(path)
    
    
def save_config(path, config):
    """
    Save data in a dict to a TOML file.

    args
        path    :   str
        config  :   dict

    """
    with open(path, 'w') as o:
        toml.dump(config, o)
 

def get_condition_fnum(currc):
    """
    get_condition_fnum(currc):
    
    args
        currc   :   dict corresponding to a single experimental condition
    
    returns
        list of file numbers associated with that condition
    
    """
    
    if 'fnum' in currc.keys():
        return currc['fnum']
    elif 'first' in currc.keys() and 'last' in currc.keys():
        fnum = list(range(currc['first'],currc['last']+1))
        if 'exclude' in currc.keys():
            fnum = [x for x in fnum if x not in currc['exclude']]
    else:
        raise Exception('File numbers must be defined for each condition in the input file '\
        'using either the \'fnum\' input or both the \'first\' and \'last\' inputs with '\
        'optional \'exclude\' input.') 
    return fnum
        
def list_movies(config):
    """
    List movie file names specified by condition list in config dictionary
    
    args
        config  :   dict
        
    returns
        list of file names
    
    """

    fnames = []

    # format string for movie file names
    formatstring = '%%s%%0%dd.%s' % (config['file_format']['ndigits'],config['file_format']['extension'])

    for c in config['conditions']:
        currc = config['conditions'][c]
        if 'basefname' not in currc.keys():
            raise Exception('You must include a basefname for every condition')
        else:
            fnum = get_condition_fnum(currc)
            for f in fnum:
                fnames.append(formatstring % (currc['basefname'],f))
                
    return fnames
          
    
def list_csvs(config):
    """
    List csv file names specified by condition list in config dictionary
    
    args
        config  :   dict
        
    returns
        list of one array per CSV file with elements:
            0: condition name
            1: basefname
            2: file number
            3: CSV file name
    
    """

    fnames = []

    # format string for movie file names
    formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits']

    for c in config['conditions']:
        currc = config['conditions'][c]
        if 'basefname' not in currc.keys():
            raise Exception('You must include a basefname for every condition')
        else:
            fnum = get_condition_fnum(currc)
            for f in fnum:
                fnames.append([c,currc['basefname'],f,formatstring % (currc['basefname'],f)])
                
    return fnames
    
    
def sort_PAPA_DR(config):
    """
    sort_PAPA_DR(config)
    
    Sort PAPA trajectories from DR trajectories given the input dictionary config.
    Write out to a new folder called "sortedTrajectories"
    
    args
        config  :   dict
        
    """
    
    # make output directory
    os.system('mkdir sortedTrajectories')
        
    # loop over tracked csv files and sort each one
    csvfnames = list_csvs(config)
    
    for f in csvfnames:
        # sort PAPA, DR, and other segments from this csv file
        write_sorted_segments(f,config,outf='sortedTrajectories')
        
def write_sorted_segments(f,config,outf='sortedTrajectories'):
    """
    write_sorted_segments(f,config):
    
    Write out new csv files corresponding to PAPA, DR, and other segments from 
    trajectory csv file f.
    
    By default, the output will be stored in folder sortedTrajectories. 
    Within the output directory, the function will create a subfolder for each experimental condition (i.e., keys of "conditions" sub-dictionary)
    Within each of these subfolders, it will create a subfolder for each distinct csv file associated with that condition.
    Finally, within that subfolder, it will write out files called PAPA.csv, DR.csv, and other.csv, corresponding to trajectories 
    for PAPA, DR, and other trajectories, respectively.
    
    args
        f - 3-element array containing base file name, file number, and CSV file name for a given CSV data file
        config - configuration settings (dict)
        outf - output directory name (string)
    
    """
    
    IS = config['illumination_sequence']
    ncycles = IS['ncycles']
    r = IS['r']
    v = IS['v']
    g = IS['g']
    fw = IS['framewindow']
    
    t = 4*r + v + g
    
    data = pd.read_csv(f[3])
    
    fwrange = np.arange(0,fw)
    
    gpost = np.arange(0,ncycles)*t + 3*r + v + g
    PAPAframes = gpost.reshape(ncycles,1) + fwrange
    PAPAframes = PAPAframes.flatten()
    
    vpost = np.arange(0,ncycles)*t + r + v
    DRframes = vpost.reshape(ncycles,1) + fwrange
    DRframes = DRframes.flatten()

    PAPAtraj = data[data['frame'].isin(PAPAframes)]
    DRtraj = data[data['frame'].isin(DRframes)]
    othertraj = data[~data['frame'].isin(PAPAframes) & ~data['frame'].isin(DRframes)]
    
    columns = ['y','x','I0','bg','y_err','x_err','I0_err','bg_err','H_det','error_flag', 
          'snr','rmse','n_iter','y_detect','x_detect','frame','loc_idx','trajectory',  
          'subproblem_n_traj','subproblem_n_locs']

    os.system('mkdir -p %s/%s/%s' % (outf,f[0],f[2]))
    PAPAtraj.to_csv('%s/%s/%s/PAPA.csv' % (outf,f[0],f[2]), index=False, columns=columns)
    DRtraj.to_csv('%s/%s/%s/DR.csv' % (outf,f[0],f[2]), index=False, columns=columns)
    othertraj.to_csv('%s/%s/%s/other.csv' % (outf,f[0],f[2]), index=False, columns=columns)


def makeSA_sorted(config,sortedFolder='sortedTrajectories',nworkers=1):
    """
    makeSAD_sorted(config,sortedFolder='sortedTrajectories',nworkers=1):
    
    Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories.
    
    Requires saspt (https://github.com/alecheckert/saspt)
    
    args
        config  :   dict - configuration settings
        sortedFolder    :   directory where sorted PAPA/DR/other trajectories are stored
        nworkers    :   number of workers to use for the computation; do not set this to
                        a value higher than the number of CPUs on your computer.
    
    """
    from saspt import StateArrayDataset, RBME
    
    PDO = ['PAPA','DR','other']
    
    settings = dict(
        likelihood_type = RBME,
        pixel_size_um = config['track']['pixel_size_um'],
        frame_interval = config['track']['frame_interval'],
        focal_depth = config['saspt']['focal_depth'],
        path_col = 'filepath',
        condition_col = 'condition',
        progress_bar = True,
        num_workers=nworkers,
    )
        
    conditions = []
    filepaths = []
    
    for c in config['conditions']:
        fnum = get_condition_fnum(config['conditions'][c])
        for f in fnum:
            for j in range(3):
                conditions.append('%s_%s' % (c,PDO[j]))
                filepaths.append('%s/%s/%s/%s.csv' % (sortedFolder,c,f,PDO[j]))
    
    paths = pd.DataFrame({'condition':conditions, 'filepath':filepaths})
    
    SAD = StateArrayDataset.from_kwargs(paths, **settings)

    return [SAD,paths]
    
def makeSA_sorted_sameN(config,sortedFolder='sortedTrajectories', 
    subsampledFolder='subsampledTrajectories',nworkers=1,randseed=None):
    """
    makeSA_sorted_sameN(config,sortedFolder='sortedTrajectories',nworkers=1,randseed=None):
    
    Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories.
    Identify the sorted category with the fewest trajectories, and randomly subsample the same number of trajectories from each of the other two categories.
    
    Requires saspt (https://github.com/alecheckert/saspt)
    
    args
        config  :   dict - configuration settings
        sortedFolder    :   directory where sorted PAPA/DR/other trajectories are stored
        subsampledFolder    :   directory where subsampled trajectories will be stored
        nworkers    :   number of workers to use for the computation; do not set this to
                        a value higher than the number of CPUs on your computer.
        randseed    :   (optional) number to serve as a random seed to initialize the          
                        random number generator (for reproducing the same "random" output when
                        the code is re-run)
    
    """
    
    from saspt import StateArrayDataset, RBME
    from random import sample, seed
    
    # set random seed if one is provided
    if randseed is not None:
        seed(randseed)
    
    PDO = ['PAPA','DR','other']
    columns = ['y','x','I0','bg','y_err','x_err','I0_err','bg_err','H_det','error_flag', 
          'snr','rmse','n_iter','y_detect','x_detect','frame','loc_idx','trajectory',  
          'subproblem_n_traj','subproblem_n_locs']
    IS = config['illumination_sequence']
    ncycles = IS['ncycles']
    r = IS['r']
    v = IS['v']
    g = IS['g']
    framesPerMovie = ncycles*(4*r+g+v) # total number of frames per movie

    settings = dict(
        likelihood_type = RBME,
        pixel_size_um = config['track']['pixel_size_um'],
        frame_interval = config['track']['frame_interval'],
        focal_depth = config['saspt']['focal_depth'],
        path_col = 'filepath',
        condition_col = 'condition',
        progress_bar = True,
        num_workers=nworkers,
    )
        
    conditions = []
    filepaths = []
    
    # concatenate all trajectories from each category, incrementing the frame and trajectory 
    # indices appropriately to avoid overlaps
    if not os.path.isdir(subsampledFolder):
        os.mkdir(subsampledFolder)    
    for c in config['conditions']:
        traj = [None,None,None] # PAPA, DR, and other trajectories
        maxtrajnum = [0,0,0]    # current maximum trajectory index
        fnum = get_condition_fnum(config['conditions'][c]) 
        i = 0 # movie index
        for f in fnum:
            for j in range(3):
                currdf = pd.read_csv('%s/%s/%s/%s.csv' % (sortedFolder,c,f,PDO[j]))
                if not currdf.empty:
                    if traj[j] is None:
                        # if this is the first file, put it in traj as-is
                        traj[j] = currdf
                        maxtrajnum[j] = currdf['trajectory'].max()
                    else:
                        # if this is not the first file, increment the trajectory indices
                        # currdf['frame'] = currdf['frame'] + i*framesPerMovie
                        currdf['trajectory'] = currdf['trajectory'] + maxtrajnum[j] + 1
                        maxtrajnum[j] = currdf['trajectory'].max()
                        traj[j] = pd.concat([traj[j],currdf],ignore_index=True)
            i = i + 1
        
        

        # include only trajectories with length greater than 1
        ntraj = [0,0,0]
        for j in range(3):
            goodtraj = traj[j]['trajectory'].value_counts()>1
            goodtraj = goodtraj[goodtraj].index # indices of trajectories longer than 1
            sel = traj[j]['trajectory'].isin(goodtraj)
            traj[j] = traj[j][sel] # retain only these trajectories
            ntraj[j] = len(traj[j]['trajectory'].unique())
            
        print(ntraj)   
        
        # export all trajectories from the condition with the fewest trajectories
        # subsample the same number from the two other conditions
        mintraj = min(ntraj)
        whichmin = ntraj.index(mintraj)
        for j in range(3):
            outfname = '%s/%s_%s.csv' % (subsampledFolder, c, PDO[j])
            if j==whichmin: # if this is the one with the fewest trajectories, no need to subsample
                traj[j].to_csv(outfname, index=False, columns=columns, header=True)
            else:
                trajind = list(traj[j]['trajectory'].unique())
                trajind = sample(trajind,mintraj)
                sel = traj[j]['trajectory'].isin(trajind)
                traj[j] = traj[j][sel]
                traj[j].to_csv(outfname, index=False, columns=columns, header=True)
            conditions.append('%s_%s' % (c,PDO[j]))
            filepaths.append(outfname)
    
    paths = pd.DataFrame({'condition':conditions, 'filepath':filepaths})
    
    SAD = StateArrayDataset.from_kwargs(paths, **settings)

    return [SAD,paths]
    

def analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1):
    """
    [SAD,posterior_occs,condition_names] = 
        analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1):
    
    Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories,
    and then infer posterior probability distribution by condition.
    
    Requires saspt (https://github.com/alecheckert/saspt)
    
    args
        config  :   dict - configuration settings
        sortedFolder    :   directory where sorted PAPA/DR/other trajectories are stored
        subsampledFolder    :   directory where subsampled trajectories will be stored
        nworkers    :   number of workers to use for the computation; do not set this to
                        a value higher than the number of CPUs on your computer.
        randseed    :   (optional) number to serve as a random seed to initialize the          
                        random number generator (for reproducing the same "random" output when
                        the code is re-run)
    
    returns
        [SAD,posterior_occs,condition_names]
        state array dataset, posterior occupations for each condition, names of each condition
    
    """
    
    import pickle
    
    [SAD,paths]=makeSA_sorted(config,sortedFolder=sortedFolder,nworkers=nworkers)
    posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition')
    D = SAD.likelihood.diff_coefs
    plot_PAPA_DR(config,D,posterior_occs,condition_names,'figures')
    with open('state_array_pickle','wb') as fh:
        pickle.dump([SAD,posterior_occs,condition_names],fh)
    # to do: Write this all out to csv files as well
    return [SAD,posterior_occs,condition_names]


def analyze_PAPA_DR_stateArray_sameN(config,sortedFolder='sortedTrajectories',
                subsampledFolder='subsampledTrajectories',
                nworkers=1,randseed=None):
    """
    [SAD,posterior_occs,condition_names] = 
        analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1):
    
    Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories,
    and then infer posterior probability distribution by condition.
    
    Requires saspt (https://github.com/alecheckert/saspt)
    
    args
        config  :   dict - configuration settings
        sortedFolder    :   directory where sorted PAPA/DR/other trajectories are stored
        nworkers    :   number of workers to use for the computation; do not set this to
                        a value higher than the number of CPUs on your computer.
    
    returns
        [SAD,posterior_occs,condition_names]
        state array dataset, posterior occupations for each condition, names of each condition
    
    """
    
    import pickle
    
    [SAD,paths]=makeSA_sorted_sameN(config,sortedFolder=sortedFolder,
            nworkers=nworkers,randseed=randseed)
    posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition')
    D = SAD.likelihood.diff_coefs
    plot_PAPA_DR(config,D,posterior_occs,condition_names,'figures_sameN')
    with open('state_array_sameN_pickle','wb') as fh:
        pickle.dump([SAD,posterior_occs,condition_names],fh)
    # to do: Write this all out to csv files as well
    return [SAD,posterior_occs,condition_names]

def plot_PAPA_DR(config,D,posterior_occs,condition_names,figfname='figures'):
    """
    plot_PAPA_DR(config,D,posterior_occs,condition_names,figfname)
    
    args:
        config  :   dict of configuration settings
        D   :   array of diffusion coefficients
        posterior_occs  :   list of lists of posterior occupations
        condition_names :   list of associated condition names
        figfname    :   name of output folder for storing figures
    """
    
    # get posterior occupations for each condition
    # This either uses the pre-calculated values or runs the calculation if it is 
    # not yet calculated.
    
    if not os.path.isdir(figfname):
        os.mkdir(figfname)
    if not os.path.isdir(figfname + '/PAPA_vs_DR_Dspectra'):
        os.mkdir(figfname + '/PAPA_vs_DR_Dspectra')
    if not os.path.isdir(figfname + '/PAPA_vs_DR_Dspectra/csvs'):
        os.mkdir(figfname + '/PAPA_vs_DR_Dspectra/csvs')
        
    #posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition')
    
    from saspt import normalize_2d
    from matplotlib import pyplot as plt
    font = {'family' : 'normal',
            'weight' : 'normal',
            'size'   : 12}
    plt.rc('font', **font)

    posterior_occs = normalize_2d(posterior_occs, axis=1)
    podict = {}
    for j in range(posterior_occs.shape[0]):
        currc = condition_names[j]
        podict[currc] = posterior_occs[j,:]
        pd.DataFrame({'D':D,'P':podict[currc]}).to_csv(
            figfname + '/PAPA_vs_DR_Dspectra/csvs/%s.csv' % currc)
    for c in config['conditions']:
        plt.figure(c)
        plt.title(config['conditions'][c]['title'])
        plt.plot(D,podict[c+'_PAPA'],'g-')
        plt.plot(D,podict[c+'_DR'],'-',color='#A000A0')
        plt.xscale('log')
        plt.xlabel('Diffusion coefficient ($\mu$m$^{2}$ s$^{-1}$)')
        plt.ylabel('Mean posterior occupation')
        plt.savefig(figfname + '/PAPA_vs_DR_Dspectra/%s.png' % c,format='png')
        
        
def plot_Nlocs_wholemovie(config,figfname='figures'):
    """
    plot_Nlocs_wholemovie(config)
    
    args:
        config  :   dict of configuration settings
        figfname:   folder name for storing figures
        
    For each condition specified in config, makes a plot of average number of localizations 
    as a function of frame number.
    """
    from matplotlib import pyplot as plt
    font = {'family' : 'normal',
            'weight' : 'normal',
            'size'   : 12}
    plt.rc('font', **font)

    IS = config['illumination_sequence']
    ncycles = IS['ncycles']
    r = IS['r']
    v = IS['v']
    g = IS['g']
  
    nframes = ncycles*(4*r + v + g)
    
    formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits']
    
    
    for c in config['conditions']:
        allframenums = pd.Series()
        currc = config['conditions'][c]
        fnum = get_condition_fnum(currc)
        for f in fnum:
            fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)]
            currdata = pd.read_csv(fname[3])
            allframenums=allframenums.append(currdata['frame'])

        meanlocs = allframenums.value_counts(bins=range(-1,nframes)).sort_index().values/len(fnum)
        
        np.savetxt(figfname + '/locsperframe/%s.csv' % c, meanlocs, delimiter=',')
        
        plt.figure(c)
        plt.title(config['conditions'][c]['title'])
        plt.plot(range(nframes),meanlocs)
        plt.xlabel('Frame number')
        plt.ylabel('Mean localization number')
        os.system('mkdir %s' % figfname)
        os.system('mkdir %s' % (figfname + '/locsperframe/'))
        plt.savefig(figfname + '/locsperframe/%s.png' % c,format='png')
    
def plot_Nlocs_bycycle(config,figfname='figures'):
    """
    plot_Nlocs_wholemovie(config)
    
    args:
        config  :   dict of configuration settings
        figfname:   folder name for storing figures
        
    For each condition specified in config, makes a plot of average number of localizations 
    as a function of frame number.
    """
    from matplotlib import pyplot as plt
    font = {'family' : 'normal',
            'weight' : 'normal',
            'size'   : 12}
    plt.rc('font', **font)

    IS = config['illumination_sequence']
    ncycles = IS['ncycles']
    r = IS['r']
    v = IS['v']
    g = IS['g']
    cyclelen = 4*r + v + g
    nframes = ncycles*cyclelen
    
    formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits']
    
    
    
    for c in config['conditions']:
        allframenums = pd.Series()
        currc = config['conditions'][c]
        fnum = get_condition_fnum(currc)
        for f in fnum:
            fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)]
            currdata = pd.read_csv(fname[3])
            allframenums=allframenums.append(currdata['frame'] % cyclelen) # do modular division by length of 1 cycle

        meanlocs = allframenums.value_counts(bins=range(-1,cyclelen)).sort_index().values/(len(fnum) * ncycles)
        
        np.savetxt(figfname + '/locsperframe_cycle/%s.csv' % c, meanlocs, delimiter=',')
        
        plt.figure(c)
        plt.title(config['conditions'][c]['title'])
        plt.plot(range(cyclelen),meanlocs)
        plt.xlabel('Frame number')
        plt.ylabel('Mean localization number')
        os.system('mkdir %s' % figfname)
        os.system('mkdir %s' % (figfname + '/locsperframe_cycle/'))
        plt.savefig(figfname + '/locsperframe_cycle/%s.png' % c,format='png')
 

# To do:

# Write a "plotSA" function that plots nice-looking PAPA vs. DR plots for all conditions







Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Contact— JavaScript license information— Web API

back to top