https://gitlab.com/tgwgraham/papacode_v2
Tip revision: 77dbcca3d4ca2833a5d051d495d52444ba34ac27 authored by Thomas Graham on 29 September 2022, 16:03:20 UTC
Add LICENSE
Add LICENSE
Tip revision: 77dbcca
papa_utils.py
"""
papa_utils.py -- utilities for PAPA-SPT analysis
"""
import os
# Read config files in TOML format
import toml
import pandas as pd
import numpy as np
def read_config(path):
"""
Read a config file.
args
path : str
returns
dict
"""
assert os.path.isfile(path), \
"read_config: path %s does not exist" % path
return toml.load(path)
def save_config(path, config):
"""
Save data in a dict to a TOML file.
args
path : str
config : dict
"""
with open(path, 'w') as o:
toml.dump(config, o)
def get_condition_fnum(currc):
"""
get_condition_fnum(currc):
args
currc : dict corresponding to a single experimental condition
returns
list of file numbers associated with that condition
"""
if 'fnum' in currc.keys():
return currc['fnum']
elif 'first' in currc.keys() and 'last' in currc.keys():
fnum = list(range(currc['first'],currc['last']+1))
if 'exclude' in currc.keys():
fnum = [x for x in fnum if x not in currc['exclude']]
else:
raise Exception('File numbers must be defined for each condition in the input file '\
'using either the \'fnum\' input or both the \'first\' and \'last\' inputs with '\
'optional \'exclude\' input.')
return fnum
def list_movies(config):
"""
List movie file names specified by condition list in config dictionary
args
config : dict
returns
list of file names
"""
fnames = []
# format string for movie file names
formatstring = '%%s%%0%dd.%s' % (config['file_format']['ndigits'],config['file_format']['extension'])
for c in config['conditions']:
currc = config['conditions'][c]
if 'basefname' not in currc.keys():
raise Exception('You must include a basefname for every condition')
else:
fnum = get_condition_fnum(currc)
for f in fnum:
fnames.append(formatstring % (currc['basefname'],f))
return fnames
def list_csvs(config):
"""
List csv file names specified by condition list in config dictionary
args
config : dict
returns
list of one array per CSV file with elements:
0: condition name
1: basefname
2: file number
3: CSV file name
"""
fnames = []
# format string for movie file names
formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits']
for c in config['conditions']:
currc = config['conditions'][c]
if 'basefname' not in currc.keys():
raise Exception('You must include a basefname for every condition')
else:
fnum = get_condition_fnum(currc)
for f in fnum:
fnames.append([c,currc['basefname'],f,formatstring % (currc['basefname'],f)])
return fnames
def sort_PAPA_DR(config):
"""
sort_PAPA_DR(config)
Sort PAPA trajectories from DR trajectories given the input dictionary config.
Write out to a new folder called "sortedTrajectories"
args
config : dict
"""
# make output directory
os.system('mkdir sortedTrajectories')
# loop over tracked csv files and sort each one
csvfnames = list_csvs(config)
for f in csvfnames:
# sort PAPA, DR, and other segments from this csv file
write_sorted_segments(f,config,outf='sortedTrajectories')
def write_sorted_segments(f,config,outf='sortedTrajectories'):
"""
write_sorted_segments(f,config):
Write out new csv files corresponding to PAPA, DR, and other segments from
trajectory csv file f.
By default, the output will be stored in folder sortedTrajectories.
Within the output directory, the function will create a subfolder for each experimental condition (i.e., keys of "conditions" sub-dictionary)
Within each of these subfolders, it will create a subfolder for each distinct csv file associated with that condition.
Finally, within that subfolder, it will write out files called PAPA.csv, DR.csv, and other.csv, corresponding to trajectories
for PAPA, DR, and other trajectories, respectively.
args
f - 3-element array containing base file name, file number, and CSV file name for a given CSV data file
config - configuration settings (dict)
outf - output directory name (string)
"""
IS = config['illumination_sequence']
ncycles = IS['ncycles']
r = IS['r']
v = IS['v']
g = IS['g']
fw = IS['framewindow']
t = 4*r + v + g
data = pd.read_csv(f[3])
fwrange = np.arange(0,fw)
gpost = np.arange(0,ncycles)*t + 3*r + v + g
PAPAframes = gpost.reshape(ncycles,1) + fwrange
PAPAframes = PAPAframes.flatten()
vpost = np.arange(0,ncycles)*t + r + v
DRframes = vpost.reshape(ncycles,1) + fwrange
DRframes = DRframes.flatten()
PAPAtraj = data[data['frame'].isin(PAPAframes)]
DRtraj = data[data['frame'].isin(DRframes)]
othertraj = data[~data['frame'].isin(PAPAframes) & ~data['frame'].isin(DRframes)]
columns = ['y','x','I0','bg','y_err','x_err','I0_err','bg_err','H_det','error_flag',
'snr','rmse','n_iter','y_detect','x_detect','frame','loc_idx','trajectory',
'subproblem_n_traj','subproblem_n_locs']
os.system('mkdir -p %s/%s/%s' % (outf,f[0],f[2]))
PAPAtraj.to_csv('%s/%s/%s/PAPA.csv' % (outf,f[0],f[2]), index=False, columns=columns)
DRtraj.to_csv('%s/%s/%s/DR.csv' % (outf,f[0],f[2]), index=False, columns=columns)
othertraj.to_csv('%s/%s/%s/other.csv' % (outf,f[0],f[2]), index=False, columns=columns)
def makeSA_sorted(config,sortedFolder='sortedTrajectories',nworkers=1):
"""
makeSAD_sorted(config,sortedFolder='sortedTrajectories',nworkers=1):
Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories.
Requires saspt (https://github.com/alecheckert/saspt)
args
config : dict - configuration settings
sortedFolder : directory where sorted PAPA/DR/other trajectories are stored
nworkers : number of workers to use for the computation; do not set this to
a value higher than the number of CPUs on your computer.
"""
from saspt import StateArrayDataset, RBME
PDO = ['PAPA','DR','other']
settings = dict(
likelihood_type = RBME,
pixel_size_um = config['track']['pixel_size_um'],
frame_interval = config['track']['frame_interval'],
focal_depth = config['saspt']['focal_depth'],
path_col = 'filepath',
condition_col = 'condition',
progress_bar = True,
num_workers=nworkers,
)
conditions = []
filepaths = []
for c in config['conditions']:
fnum = get_condition_fnum(config['conditions'][c])
for f in fnum:
for j in range(3):
conditions.append('%s_%s' % (c,PDO[j]))
filepaths.append('%s/%s/%s/%s.csv' % (sortedFolder,c,f,PDO[j]))
paths = pd.DataFrame({'condition':conditions, 'filepath':filepaths})
SAD = StateArrayDataset.from_kwargs(paths, **settings)
return [SAD,paths]
def makeSA_sorted_sameN(config,sortedFolder='sortedTrajectories',
subsampledFolder='subsampledTrajectories',nworkers=1,randseed=None):
"""
makeSA_sorted_sameN(config,sortedFolder='sortedTrajectories',nworkers=1,randseed=None):
Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories.
Identify the sorted category with the fewest trajectories, and randomly subsample the same number of trajectories from each of the other two categories.
Requires saspt (https://github.com/alecheckert/saspt)
args
config : dict - configuration settings
sortedFolder : directory where sorted PAPA/DR/other trajectories are stored
subsampledFolder : directory where subsampled trajectories will be stored
nworkers : number of workers to use for the computation; do not set this to
a value higher than the number of CPUs on your computer.
randseed : (optional) number to serve as a random seed to initialize the
random number generator (for reproducing the same "random" output when
the code is re-run)
"""
from saspt import StateArrayDataset, RBME
from random import sample, seed
# set random seed if one is provided
if randseed is not None:
seed(randseed)
PDO = ['PAPA','DR','other']
columns = ['y','x','I0','bg','y_err','x_err','I0_err','bg_err','H_det','error_flag',
'snr','rmse','n_iter','y_detect','x_detect','frame','loc_idx','trajectory',
'subproblem_n_traj','subproblem_n_locs']
IS = config['illumination_sequence']
ncycles = IS['ncycles']
r = IS['r']
v = IS['v']
g = IS['g']
framesPerMovie = ncycles*(4*r+g+v) # total number of frames per movie
settings = dict(
likelihood_type = RBME,
pixel_size_um = config['track']['pixel_size_um'],
frame_interval = config['track']['frame_interval'],
focal_depth = config['saspt']['focal_depth'],
path_col = 'filepath',
condition_col = 'condition',
progress_bar = True,
num_workers=nworkers,
)
conditions = []
filepaths = []
# concatenate all trajectories from each category, incrementing the frame and trajectory
# indices appropriately to avoid overlaps
if not os.path.isdir(subsampledFolder):
os.mkdir(subsampledFolder)
for c in config['conditions']:
traj = [None,None,None] # PAPA, DR, and other trajectories
maxtrajnum = [0,0,0] # current maximum trajectory index
fnum = get_condition_fnum(config['conditions'][c])
i = 0 # movie index
for f in fnum:
for j in range(3):
currdf = pd.read_csv('%s/%s/%s/%s.csv' % (sortedFolder,c,f,PDO[j]))
if not currdf.empty:
if traj[j] is None:
# if this is the first file, put it in traj as-is
traj[j] = currdf
maxtrajnum[j] = currdf['trajectory'].max()
else:
# if this is not the first file, increment the trajectory indices
# currdf['frame'] = currdf['frame'] + i*framesPerMovie
currdf['trajectory'] = currdf['trajectory'] + maxtrajnum[j] + 1
maxtrajnum[j] = currdf['trajectory'].max()
traj[j] = pd.concat([traj[j],currdf],ignore_index=True)
i = i + 1
# include only trajectories with length greater than 1
ntraj = [0,0,0]
for j in range(3):
goodtraj = traj[j]['trajectory'].value_counts()>1
goodtraj = goodtraj[goodtraj].index # indices of trajectories longer than 1
sel = traj[j]['trajectory'].isin(goodtraj)
traj[j] = traj[j][sel] # retain only these trajectories
ntraj[j] = len(traj[j]['trajectory'].unique())
print(ntraj)
# export all trajectories from the condition with the fewest trajectories
# subsample the same number from the two other conditions
mintraj = min(ntraj)
whichmin = ntraj.index(mintraj)
for j in range(3):
outfname = '%s/%s_%s.csv' % (subsampledFolder, c, PDO[j])
if j==whichmin: # if this is the one with the fewest trajectories, no need to subsample
traj[j].to_csv(outfname, index=False, columns=columns, header=True)
else:
trajind = list(traj[j]['trajectory'].unique())
trajind = sample(trajind,mintraj)
sel = traj[j]['trajectory'].isin(trajind)
traj[j] = traj[j][sel]
traj[j].to_csv(outfname, index=False, columns=columns, header=True)
conditions.append('%s_%s' % (c,PDO[j]))
filepaths.append(outfname)
paths = pd.DataFrame({'condition':conditions, 'filepath':filepaths})
SAD = StateArrayDataset.from_kwargs(paths, **settings)
return [SAD,paths]
def analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1):
"""
[SAD,posterior_occs,condition_names] =
analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1):
Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories,
and then infer posterior probability distribution by condition.
Requires saspt (https://github.com/alecheckert/saspt)
args
config : dict - configuration settings
sortedFolder : directory where sorted PAPA/DR/other trajectories are stored
subsampledFolder : directory where subsampled trajectories will be stored
nworkers : number of workers to use for the computation; do not set this to
a value higher than the number of CPUs on your computer.
randseed : (optional) number to serve as a random seed to initialize the
random number generator (for reproducing the same "random" output when
the code is re-run)
returns
[SAD,posterior_occs,condition_names]
state array dataset, posterior occupations for each condition, names of each condition
"""
import pickle
[SAD,paths]=makeSA_sorted(config,sortedFolder=sortedFolder,nworkers=nworkers)
posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition')
D = SAD.likelihood.diff_coefs
plot_PAPA_DR(config,D,posterior_occs,condition_names,'figures')
with open('state_array_pickle','wb') as fh:
pickle.dump([SAD,posterior_occs,condition_names],fh)
# to do: Write this all out to csv files as well
return [SAD,posterior_occs,condition_names]
def analyze_PAPA_DR_stateArray_sameN(config,sortedFolder='sortedTrajectories',
subsampledFolder='subsampledTrajectories',
nworkers=1,randseed=None):
"""
[SAD,posterior_occs,condition_names] =
analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1):
Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories,
and then infer posterior probability distribution by condition.
Requires saspt (https://github.com/alecheckert/saspt)
args
config : dict - configuration settings
sortedFolder : directory where sorted PAPA/DR/other trajectories are stored
nworkers : number of workers to use for the computation; do not set this to
a value higher than the number of CPUs on your computer.
returns
[SAD,posterior_occs,condition_names]
state array dataset, posterior occupations for each condition, names of each condition
"""
import pickle
[SAD,paths]=makeSA_sorted_sameN(config,sortedFolder=sortedFolder,
nworkers=nworkers,randseed=randseed)
posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition')
D = SAD.likelihood.diff_coefs
plot_PAPA_DR(config,D,posterior_occs,condition_names,'figures_sameN')
with open('state_array_sameN_pickle','wb') as fh:
pickle.dump([SAD,posterior_occs,condition_names],fh)
# to do: Write this all out to csv files as well
return [SAD,posterior_occs,condition_names]
def plot_PAPA_DR(config,D,posterior_occs,condition_names,figfname='figures'):
"""
plot_PAPA_DR(config,D,posterior_occs,condition_names,figfname)
args:
config : dict of configuration settings
D : array of diffusion coefficients
posterior_occs : list of lists of posterior occupations
condition_names : list of associated condition names
figfname : name of output folder for storing figures
"""
# get posterior occupations for each condition
# This either uses the pre-calculated values or runs the calculation if it is
# not yet calculated.
if not os.path.isdir(figfname):
os.mkdir(figfname)
if not os.path.isdir(figfname + '/PAPA_vs_DR_Dspectra'):
os.mkdir(figfname + '/PAPA_vs_DR_Dspectra')
if not os.path.isdir(figfname + '/PAPA_vs_DR_Dspectra/csvs'):
os.mkdir(figfname + '/PAPA_vs_DR_Dspectra/csvs')
#posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition')
from saspt import normalize_2d
from matplotlib import pyplot as plt
font = {'family' : 'normal',
'weight' : 'normal',
'size' : 12}
plt.rc('font', **font)
posterior_occs = normalize_2d(posterior_occs, axis=1)
podict = {}
for j in range(posterior_occs.shape[0]):
currc = condition_names[j]
podict[currc] = posterior_occs[j,:]
pd.DataFrame({'D':D,'P':podict[currc]}).to_csv(
figfname + '/PAPA_vs_DR_Dspectra/csvs/%s.csv' % currc)
for c in config['conditions']:
plt.figure(c)
plt.title(config['conditions'][c]['title'])
plt.plot(D,podict[c+'_PAPA'],'g-')
plt.plot(D,podict[c+'_DR'],'-',color='#A000A0')
plt.xscale('log')
plt.xlabel('Diffusion coefficient ($\mu$m$^{2}$ s$^{-1}$)')
plt.ylabel('Mean posterior occupation')
plt.savefig(figfname + '/PAPA_vs_DR_Dspectra/%s.png' % c,format='png')
def plot_Nlocs_wholemovie(config,figfname='figures'):
"""
plot_Nlocs_wholemovie(config)
args:
config : dict of configuration settings
figfname: folder name for storing figures
For each condition specified in config, makes a plot of average number of localizations
as a function of frame number.
"""
from matplotlib import pyplot as plt
font = {'family' : 'normal',
'weight' : 'normal',
'size' : 12}
plt.rc('font', **font)
IS = config['illumination_sequence']
ncycles = IS['ncycles']
r = IS['r']
v = IS['v']
g = IS['g']
nframes = ncycles*(4*r + v + g)
formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits']
for c in config['conditions']:
allframenums = pd.Series()
currc = config['conditions'][c]
fnum = get_condition_fnum(currc)
for f in fnum:
fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)]
currdata = pd.read_csv(fname[3])
allframenums=allframenums.append(currdata['frame'])
meanlocs = allframenums.value_counts(bins=range(-1,nframes)).sort_index().values/len(fnum)
np.savetxt(figfname + '/locsperframe/%s.csv' % c, meanlocs, delimiter=',')
plt.figure(c)
plt.title(config['conditions'][c]['title'])
plt.plot(range(nframes),meanlocs)
plt.xlabel('Frame number')
plt.ylabel('Mean localization number')
os.system('mkdir %s' % figfname)
os.system('mkdir %s' % (figfname + '/locsperframe/'))
plt.savefig(figfname + '/locsperframe/%s.png' % c,format='png')
def plot_Nlocs_bycycle(config,figfname='figures'):
"""
plot_Nlocs_wholemovie(config)
args:
config : dict of configuration settings
figfname: folder name for storing figures
For each condition specified in config, makes a plot of average number of localizations
as a function of frame number.
"""
from matplotlib import pyplot as plt
font = {'family' : 'normal',
'weight' : 'normal',
'size' : 12}
plt.rc('font', **font)
IS = config['illumination_sequence']
ncycles = IS['ncycles']
r = IS['r']
v = IS['v']
g = IS['g']
cyclelen = 4*r + v + g
nframes = ncycles*cyclelen
formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits']
for c in config['conditions']:
allframenums = pd.Series()
currc = config['conditions'][c]
fnum = get_condition_fnum(currc)
for f in fnum:
fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)]
currdata = pd.read_csv(fname[3])
allframenums=allframenums.append(currdata['frame'] % cyclelen) # do modular division by length of 1 cycle
meanlocs = allframenums.value_counts(bins=range(-1,cyclelen)).sort_index().values/(len(fnum) * ncycles)
np.savetxt(figfname + '/locsperframe_cycle/%s.csv' % c, meanlocs, delimiter=',')
plt.figure(c)
plt.title(config['conditions'][c]['title'])
plt.plot(range(cyclelen),meanlocs)
plt.xlabel('Frame number')
plt.ylabel('Mean localization number')
os.system('mkdir %s' % figfname)
os.system('mkdir %s' % (figfname + '/locsperframe_cycle/'))
plt.savefig(figfname + '/locsperframe_cycle/%s.png' % c,format='png')
# To do:
# Write a "plotSA" function that plots nice-looking PAPA vs. DR plots for all conditions