""" papa_utils.py -- utilities for PAPA-SPT analysis """ import os # Read config files in TOML format import toml import pandas as pd import numpy as np def read_config(path): """ Read a config file. args path : str returns dict """ assert os.path.isfile(path), \ "read_config: path %s does not exist" % path return toml.load(path) def save_config(path, config): """ Save data in a dict to a TOML file. args path : str config : dict """ with open(path, 'w') as o: toml.dump(config, o) def get_condition_fnum(currc): """ get_condition_fnum(currc): args currc : dict corresponding to a single experimental condition returns list of file numbers associated with that condition """ if 'fnum' in currc.keys(): return currc['fnum'] elif 'first' in currc.keys() and 'last' in currc.keys(): fnum = list(range(currc['first'],currc['last']+1)) if 'exclude' in currc.keys(): fnum = [x for x in fnum if x not in currc['exclude']] else: raise Exception('File numbers must be defined for each condition in the input file '\ 'using either the \'fnum\' input or both the \'first\' and \'last\' inputs with '\ 'optional \'exclude\' input.') return fnum def list_movies(config): """ List movie file names specified by condition list in config dictionary args config : dict returns list of file names """ fnames = [] # format string for movie file names formatstring = '%%s%%0%dd.%s' % (config['file_format']['ndigits'],config['file_format']['extension']) for c in config['conditions']: currc = config['conditions'][c] if 'basefname' not in currc.keys(): raise Exception('You must include a basefname for every condition') else: fnum = get_condition_fnum(currc) for f in fnum: fnames.append(formatstring % (currc['basefname'],f)) return fnames def list_csvs(config): """ List csv file names specified by condition list in config dictionary args config : dict returns list of one array per CSV file with elements: 0: condition name 1: basefname 2: file number 3: CSV file name """ fnames = [] # format string for movie file names formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits'] for c in config['conditions']: currc = config['conditions'][c] if 'basefname' not in currc.keys(): raise Exception('You must include a basefname for every condition') else: fnum = get_condition_fnum(currc) for f in fnum: fnames.append([c,currc['basefname'],f,formatstring % (currc['basefname'],f)]) return fnames def sort_PAPA_DR(config): """ sort_PAPA_DR(config) Sort PAPA trajectories from DR trajectories given the input dictionary config. Write out to a new folder called "sortedTrajectories" args config : dict """ # make output directory os.system('mkdir sortedTrajectories') # loop over tracked csv files and sort each one csvfnames = list_csvs(config) for f in csvfnames: # sort PAPA, DR, and other segments from this csv file write_sorted_segments(f,config,outf='sortedTrajectories') def write_sorted_segments(f,config,outf='sortedTrajectories'): """ write_sorted_segments(f,config): Write out new csv files corresponding to PAPA, DR, and other segments from trajectory csv file f. By default, the output will be stored in folder sortedTrajectories. Within the output directory, the function will create a subfolder for each experimental condition (i.e., keys of "conditions" sub-dictionary) Within each of these subfolders, it will create a subfolder for each distinct csv file associated with that condition. Finally, within that subfolder, it will write out files called PAPA.csv, DR.csv, and other.csv, corresponding to trajectories for PAPA, DR, and other trajectories, respectively. args f - 3-element array containing base file name, file number, and CSV file name for a given CSV data file config - configuration settings (dict) outf - output directory name (string) """ IS = config['illumination_sequence'] ncycles = IS['ncycles'] r = IS['r'] v = IS['v'] g = IS['g'] fw = IS['framewindow'] t = 4*r + v + g data = pd.read_csv(f[3]) fwrange = np.arange(0,fw) gpost = np.arange(0,ncycles)*t + 3*r + v + g PAPAframes = gpost.reshape(ncycles,1) + fwrange PAPAframes = PAPAframes.flatten() vpost = np.arange(0,ncycles)*t + r + v DRframes = vpost.reshape(ncycles,1) + fwrange DRframes = DRframes.flatten() PAPAtraj = data[data['frame'].isin(PAPAframes)] DRtraj = data[data['frame'].isin(DRframes)] othertraj = data[~data['frame'].isin(PAPAframes) & ~data['frame'].isin(DRframes)] columns = ['y','x','I0','bg','y_err','x_err','I0_err','bg_err','H_det','error_flag', 'snr','rmse','n_iter','y_detect','x_detect','frame','loc_idx','trajectory', 'subproblem_n_traj','subproblem_n_locs'] os.system('mkdir -p %s/%s/%s' % (outf,f[0],f[2])) PAPAtraj.to_csv('%s/%s/%s/PAPA.csv' % (outf,f[0],f[2]), index=False, columns=columns) DRtraj.to_csv('%s/%s/%s/DR.csv' % (outf,f[0],f[2]), index=False, columns=columns) othertraj.to_csv('%s/%s/%s/other.csv' % (outf,f[0],f[2]), index=False, columns=columns) def makeSA_sorted(config,sortedFolder='sortedTrajectories',nworkers=1): """ makeSAD_sorted(config,sortedFolder='sortedTrajectories',nworkers=1): Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories. Requires saspt (https://github.com/alecheckert/saspt) args config : dict - configuration settings sortedFolder : directory where sorted PAPA/DR/other trajectories are stored nworkers : number of workers to use for the computation; do not set this to a value higher than the number of CPUs on your computer. """ from saspt import StateArrayDataset, RBME PDO = ['PAPA','DR','other'] settings = dict( likelihood_type = RBME, pixel_size_um = config['track']['pixel_size_um'], frame_interval = config['track']['frame_interval'], focal_depth = config['saspt']['focal_depth'], path_col = 'filepath', condition_col = 'condition', progress_bar = True, num_workers=nworkers, ) conditions = [] filepaths = [] for c in config['conditions']: fnum = get_condition_fnum(config['conditions'][c]) for f in fnum: for j in range(3): conditions.append('%s_%s' % (c,PDO[j])) filepaths.append('%s/%s/%s/%s.csv' % (sortedFolder,c,f,PDO[j])) paths = pd.DataFrame({'condition':conditions, 'filepath':filepaths}) SAD = StateArrayDataset.from_kwargs(paths, **settings) return [SAD,paths] def makeSA_sorted_sameN(config,sortedFolder='sortedTrajectories', subsampledFolder='subsampledTrajectories',nworkers=1,randseed=None): """ makeSA_sorted_sameN(config,sortedFolder='sortedTrajectories',nworkers=1,randseed=None): Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories. Identify the sorted category with the fewest trajectories, and randomly subsample the same number of trajectories from each of the other two categories. Requires saspt (https://github.com/alecheckert/saspt) args config : dict - configuration settings sortedFolder : directory where sorted PAPA/DR/other trajectories are stored subsampledFolder : directory where subsampled trajectories will be stored nworkers : number of workers to use for the computation; do not set this to a value higher than the number of CPUs on your computer. randseed : (optional) number to serve as a random seed to initialize the random number generator (for reproducing the same "random" output when the code is re-run) """ from saspt import StateArrayDataset, RBME from random import sample, seed # set random seed if one is provided if randseed is not None: seed(randseed) PDO = ['PAPA','DR','other'] columns = ['y','x','I0','bg','y_err','x_err','I0_err','bg_err','H_det','error_flag', 'snr','rmse','n_iter','y_detect','x_detect','frame','loc_idx','trajectory', 'subproblem_n_traj','subproblem_n_locs'] IS = config['illumination_sequence'] ncycles = IS['ncycles'] r = IS['r'] v = IS['v'] g = IS['g'] framesPerMovie = ncycles*(4*r+g+v) # total number of frames per movie settings = dict( likelihood_type = RBME, pixel_size_um = config['track']['pixel_size_um'], frame_interval = config['track']['frame_interval'], focal_depth = config['saspt']['focal_depth'], path_col = 'filepath', condition_col = 'condition', progress_bar = True, num_workers=nworkers, ) conditions = [] filepaths = [] # concatenate all trajectories from each category, incrementing the frame and trajectory # indices appropriately to avoid overlaps if not os.path.isdir(subsampledFolder): os.mkdir(subsampledFolder) for c in config['conditions']: traj = [None,None,None] # PAPA, DR, and other trajectories maxtrajnum = [0,0,0] # current maximum trajectory index fnum = get_condition_fnum(config['conditions'][c]) i = 0 # movie index for f in fnum: for j in range(3): currdf = pd.read_csv('%s/%s/%s/%s.csv' % (sortedFolder,c,f,PDO[j])) if not currdf.empty: if traj[j] is None: # if this is the first file, put it in traj as-is traj[j] = currdf maxtrajnum[j] = currdf['trajectory'].max() else: # if this is not the first file, increment the trajectory indices # currdf['frame'] = currdf['frame'] + i*framesPerMovie currdf['trajectory'] = currdf['trajectory'] + maxtrajnum[j] + 1 maxtrajnum[j] = currdf['trajectory'].max() traj[j] = pd.concat([traj[j],currdf],ignore_index=True) i = i + 1 # include only trajectories with length greater than 1 ntraj = [0,0,0] for j in range(3): goodtraj = traj[j]['trajectory'].value_counts()>1 goodtraj = goodtraj[goodtraj].index # indices of trajectories longer than 1 sel = traj[j]['trajectory'].isin(goodtraj) traj[j] = traj[j][sel] # retain only these trajectories ntraj[j] = len(traj[j]['trajectory'].unique()) print(ntraj) # export all trajectories from the condition with the fewest trajectories # subsample the same number from the two other conditions mintraj = min(ntraj) whichmin = ntraj.index(mintraj) for j in range(3): outfname = '%s/%s_%s.csv' % (subsampledFolder, c, PDO[j]) if j==whichmin: # if this is the one with the fewest trajectories, no need to subsample traj[j].to_csv(outfname, index=False, columns=columns, header=True) else: trajind = list(traj[j]['trajectory'].unique()) trajind = sample(trajind,mintraj) sel = traj[j]['trajectory'].isin(trajind) traj[j] = traj[j][sel] traj[j].to_csv(outfname, index=False, columns=columns, header=True) conditions.append('%s_%s' % (c,PDO[j])) filepaths.append(outfname) paths = pd.DataFrame({'condition':conditions, 'filepath':filepaths}) SAD = StateArrayDataset.from_kwargs(paths, **settings) return [SAD,paths] def analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1): """ [SAD,posterior_occs,condition_names] = analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1): Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories, and then infer posterior probability distribution by condition. Requires saspt (https://github.com/alecheckert/saspt) args config : dict - configuration settings sortedFolder : directory where sorted PAPA/DR/other trajectories are stored subsampledFolder : directory where subsampled trajectories will be stored nworkers : number of workers to use for the computation; do not set this to a value higher than the number of CPUs on your computer. randseed : (optional) number to serve as a random seed to initialize the random number generator (for reproducing the same "random" output when the code is re-run) returns [SAD,posterior_occs,condition_names] state array dataset, posterior occupations for each condition, names of each condition """ import pickle [SAD,paths]=makeSA_sorted(config,sortedFolder=sortedFolder,nworkers=nworkers) posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition') D = SAD.likelihood.diff_coefs plot_PAPA_DR(config,D,posterior_occs,condition_names,'figures') with open('state_array_pickle','wb') as fh: pickle.dump([SAD,posterior_occs,condition_names],fh) # to do: Write this all out to csv files as well return [SAD,posterior_occs,condition_names] def analyze_PAPA_DR_stateArray_sameN(config,sortedFolder='sortedTrajectories', subsampledFolder='subsampledTrajectories', nworkers=1,randseed=None): """ [SAD,posterior_occs,condition_names] = analyze_PAPA_DR_stateArray(config,sortedFolder='sortedTrajectories',nworkers=1): Make StateArray for the entire dataset of sorted PAPA, DR, and other trajectories, and then infer posterior probability distribution by condition. Requires saspt (https://github.com/alecheckert/saspt) args config : dict - configuration settings sortedFolder : directory where sorted PAPA/DR/other trajectories are stored nworkers : number of workers to use for the computation; do not set this to a value higher than the number of CPUs on your computer. returns [SAD,posterior_occs,condition_names] state array dataset, posterior occupations for each condition, names of each condition """ import pickle [SAD,paths]=makeSA_sorted_sameN(config,sortedFolder=sortedFolder, nworkers=nworkers,randseed=randseed) posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition') D = SAD.likelihood.diff_coefs plot_PAPA_DR(config,D,posterior_occs,condition_names,'figures_sameN') with open('state_array_sameN_pickle','wb') as fh: pickle.dump([SAD,posterior_occs,condition_names],fh) # to do: Write this all out to csv files as well return [SAD,posterior_occs,condition_names] def plot_PAPA_DR(config,D,posterior_occs,condition_names,figfname='figures'): """ plot_PAPA_DR(config,D,posterior_occs,condition_names,figfname) args: config : dict of configuration settings D : array of diffusion coefficients posterior_occs : list of lists of posterior occupations condition_names : list of associated condition names figfname : name of output folder for storing figures """ # get posterior occupations for each condition # This either uses the pre-calculated values or runs the calculation if it is # not yet calculated. if not os.path.isdir(figfname): os.mkdir(figfname) if not os.path.isdir(figfname + '/PAPA_vs_DR_Dspectra'): os.mkdir(figfname + '/PAPA_vs_DR_Dspectra') if not os.path.isdir(figfname + '/PAPA_vs_DR_Dspectra/csvs'): os.mkdir(figfname + '/PAPA_vs_DR_Dspectra/csvs') #posterior_occs, condition_names = SAD.infer_posterior_by_condition('condition') from saspt import normalize_2d from matplotlib import pyplot as plt font = {'family' : 'normal', 'weight' : 'normal', 'size' : 12} plt.rc('font', **font) posterior_occs = normalize_2d(posterior_occs, axis=1) podict = {} for j in range(posterior_occs.shape[0]): currc = condition_names[j] podict[currc] = posterior_occs[j,:] pd.DataFrame({'D':D,'P':podict[currc]}).to_csv( figfname + '/PAPA_vs_DR_Dspectra/csvs/%s.csv' % currc) for c in config['conditions']: plt.figure(c) plt.title(config['conditions'][c]['title']) plt.plot(D,podict[c+'_PAPA'],'g-') plt.plot(D,podict[c+'_DR'],'-',color='#A000A0') plt.xscale('log') plt.xlabel('Diffusion coefficient ($\mu$m$^{2}$ s$^{-1}$)') plt.ylabel('Mean posterior occupation') plt.savefig(figfname + '/PAPA_vs_DR_Dspectra/%s.png' % c,format='png') def plot_Nlocs_wholemovie(config,figfname='figures'): """ plot_Nlocs_wholemovie(config) args: config : dict of configuration settings figfname: folder name for storing figures For each condition specified in config, makes a plot of average number of localizations as a function of frame number. """ from matplotlib import pyplot as plt font = {'family' : 'normal', 'weight' : 'normal', 'size' : 12} plt.rc('font', **font) IS = config['illumination_sequence'] ncycles = IS['ncycles'] r = IS['r'] v = IS['v'] g = IS['g'] nframes = ncycles*(4*r + v + g) formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits'] for c in config['conditions']: allframenums = pd.Series() currc = config['conditions'][c] fnum = get_condition_fnum(currc) for f in fnum: fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)] currdata = pd.read_csv(fname[3]) allframenums=allframenums.append(currdata['frame']) meanlocs = allframenums.value_counts(bins=range(-1,nframes)).sort_index().values/len(fnum) os.makedirs(figfname + '/locsperframe/', exist_ok=True) np.savetxt(figfname + '/locsperframe/%s.csv' % c, meanlocs, delimiter=',') plt.figure(c) plt.title(config['conditions'][c]['title']) plt.plot(range(nframes),meanlocs) plt.xlabel('Frame number') plt.ylabel('Mean localization number') plt.savefig(figfname + '/locsperframe/%s.png' % c,format='png') def plot_Nlocs_bycycle(config,figfname='figures'): """ plot_Nlocs_wholemovie(config) args: config : dict of configuration settings figfname: folder name for storing figures For each condition specified in config, makes a plot of average number of localizations as a function of frame number. """ from matplotlib import pyplot as plt font = {'family' : 'normal', 'weight' : 'normal', 'size' : 12} plt.rc('font', **font) IS = config['illumination_sequence'] ncycles = IS['ncycles'] r = IS['r'] v = IS['v'] g = IS['g'] cyclelen = 4*r + v + g nframes = ncycles*cyclelen formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits'] for c in config['conditions']: allframenums = pd.Series() currc = config['conditions'][c] fnum = get_condition_fnum(currc) for f in fnum: fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)] currdata = pd.read_csv(fname[3]) allframenums=allframenums.append(currdata['frame'] % cyclelen) # do modular division by length of 1 cycle meanlocs = allframenums.value_counts(bins=range(-1,cyclelen)).sort_index().values/(len(fnum) * ncycles) os.makedirs(figfname + '/locsperframe_cycle/', exist_ok=True) np.savetxt(figfname + '/locsperframe_cycle/%s.csv' % c, meanlocs, delimiter=',') plt.figure(c) plt.title(config['conditions'][c]['title']) plt.plot(range(cyclelen),meanlocs) plt.xlabel('Frame number') plt.ylabel('Mean localization number') plt.savefig(figfname + '/locsperframe_cycle/%s.png' % c,format='png') def plot_Nlocs_bycycle_colors(config,figfname='figures'): """ plot_Nlocs_wholemovie(config) args: config : dict of configuration settings figfname: folder name for storing figures For each condition specified in config, makes a plot of average number of localizations as a function of frame number. This version color-codes green and violet the frames that are used to collect PAPA and DR trajectories. """ from matplotlib import pyplot as plt font = {'family' : 'normal', 'weight' : 'normal', 'size' : 12} plt.rc('font', **font) IS = config['illumination_sequence'] ncycles = IS['ncycles'] r = IS['r'] v = IS['v'] g = IS['g'] fw = IS['framewindow'] cyclelen = 4*r + v + g nframes = ncycles*cyclelen formatstring = '%%s%%0%dd.csv' % config['file_format']['ndigits'] for c in config['conditions']: allframenums = pd.Series() currc = config['conditions'][c] fnum = get_condition_fnum(currc) for f in fnum: fname = [c,currc['basefname'],f,formatstring % (currc['basefname'],f)] currdata = pd.read_csv(fname[3]) allframenums=allframenums.append(currdata['frame'] % cyclelen) # do modular division by length of 1 cycle meanlocs = allframenums.value_counts(bins=range(-1,cyclelen)).sort_index().values/(len(fnum) * ncycles) os.makedirs(figfname + '/locsperframe_cycle/', exist_ok=True) np.savetxt(figfname + '/locsperframe_cycle/%s.csv' % c, meanlocs, delimiter=',') plt.figure(c) plt.title(config['conditions'][c]['title']) # don't plot localizations that occur within the pulses themselves #framenums = np.concatenate((np.arange(0,r),np.arange(r+v+fw,3*r+v),np.arange(3*r+v+g+fw,4*r))) framenums1 = np.arange(0,r) framenums2 = np.arange(r+v+fw,3*r+v) framenums3 = np.arange(3*r+v+g+fw,cyclelen) # corrected on 20230131 framenums_violet = np.arange(r+v,r+v+fw) framenums_green = np.arange(3*r+v+g,3*r+v+g+fw) #plt.plot(range(cyclelen),meanlocs) plt.plot(framenums1,meanlocs[framenums1],'k-') plt.plot(framenums2,meanlocs[framenums2],'k-') plt.plot(framenums3,meanlocs[framenums3],'k-') plt.plot(framenums_violet,meanlocs[framenums_violet],'-',color='blueviolet') plt.plot(framenums_green,meanlocs[framenums_green],'-',color='green') plt.xlabel('Frame number') plt.ylabel('Mean localization number') plt.savefig(figfname + '/locsperframe_cycle/%s.png' % c,format='png') def get_ND2_times(config): import re """ List ND2 movie file names with timestamps (Julian Date Number or JDN) args config : dict returns dictionary containing file names and timestamps for each movie in each condition """ rv = {} bytesfromend = 607000 # where to start reading in the metadata relative the end of the file # format string for movie file names formatstring = '%%s%%0%dd.%s' % (config['file_format']['ndigits'],config['file_format']['extension']) # Julian Date Number pattern to match in the metadata at the end of the ND2 file pattern = r"" for c in config['conditions']: rv[c] = [] currc = config['conditions'][c] if 'basefname' not in currc.keys(): raise Exception('You must include a basefname for every condition') else: fnum = get_condition_fnum(currc) for f in fnum: currfname = formatstring % (currc['basefname'],f) with open(currfname, "rb") as file: # Read the bytes at the end of the file file.seek(os.path.getsize(currfname)-bytesfromend) bytes = file.read() decoded = bytes.decode(errors='ignore') numbers = re.findall(pattern, decoded) #print(f"{currfname} {numbers}") if not numbers: rv[c].append([currfname,np.nan]) else: rv[c].append([currfname,float(numbers[0])]) return rv def getalldisp(fname): # returns a list of all single-molecule displacements from a file t = np.genfromtxt(fname, skip_header=1, delimiter=',') traj = np.unique(t[:, 17]) sz = 0 for j in range(traj.size): tcount = np.sum(t[:, 17] == traj[j]) if tcount > 1: sz = sz + tcount - 1 rv = np.zeros(sz) counter = 0 for j in range(traj.size): selector = (t[:, 17] == traj[j]) if np.sum(selector) > 1: currtraj = t[selector, :2] rv[counter:(counter + currtraj.shape[0] - 1)] = \ np.sqrt(np.power(currtraj[1:, 0] - currtraj[:-1, 0], 2) + np.power(currtraj[1:, 1] - currtraj[:-1, 1], 2)) counter = counter + currtraj.shape[0] - 1 return rv