https://github.com/PeyracheLab/pynacollada
Raw File
Tip revision: 4dd1adddad54627601f7567354fa7f0af020fc7d authored by Guillaume Viejo on 05 July 2023, 20:05:21 UTC
Merge pull request #22 from slcalgin/main
Tip revision: 4dd1add
neural_tuning.py
# -*- coding: utf-8 -*-
# @Author: gviejo
# @Date:   2022-01-17 14:10:40
# @Last Modified by:   gviejo
# @Last Modified time: 2022-01-18 17:40:08
import pynapple as nap
import pandas as pd
import numpy as np

def computePlaceFields(spikes, position, ep, nb_bins = 100, frequency = 120.0, smoothing = 2):
    place_fields = {}
    position_tsd = position.restrict(ep)
    xpos = position_tsd.iloc[:,0]
    ypos = position_tsd.iloc[:,1]
    xbins = np.linspace(xpos.min(), xpos.max()+1e-6, nb_bins+1)
    ybins = np.linspace(ypos.min(), ypos.max()+1e-6, nb_bins+1)    
    for n in spikes:
        position_spike = position_tsd.realign(spikes[n].restrict(ep))
        spike_count,_,_ = np.histogram2d(position_spike.iloc[:,1].values, position_spike.iloc[:,0].values, [ybins,xbins])
        occupancy, _, _ = np.histogram2d(ypos, xpos, [ybins,xbins])
        mean_spike_count = spike_count/(occupancy+1)
        place_field = mean_spike_count*frequency    
        place_fields[n] = pd.DataFrame(index = ybins[0:-1][::-1],columns = xbins[0:-1], data = place_field)
        
    extent = (xbins[0], xbins[-1], ybins[0], ybins[-1]) # USEFUL FOR MATPLOTLIB
    return place_fields, extent

def computeOccupancy(position, ep, nb_bins = 100):
    xpos = position.restrict(ep).iloc[:,0]
    ypos = position.restrict(ep).iloc[:,1]    
    xbins = np.linspace(xpos.min(), xpos.max()+1e-6, nb_bins+1)
    ybins = np.linspace(ypos.min(), ypos.max()+1e-6, nb_bins+1)
    occupancy, _, _ = np.histogram2d(ypos, xpos, [ybins,xbins])
    return occupancy

def findHDCells(tuning_curves, z = 50, p = 0.0001 , m = 1):
	"""
		Peak firing rate larger than 1
		and Rayleigh test p<0.001 & z > 100
	"""
	cond1 = tuning_curves.max()>m
	from pycircstat.tests import rayleigh
	stat = pd.DataFrame(index = tuning_curves.columns, columns = ['pval', 'z'])
	for k in tuning_curves:
		stat.loc[k] = rayleigh(tuning_curves[k].index.values, tuning_curves[k].values)
	cond2 = np.logical_and(stat['pval']<p,stat['z']>z)
	tokeep = stat.index.values[np.where(np.logical_and(cond1, cond2))[0]]
	hd = pd.Series(index = tuning_curves.columns, data = 0)
	hd.loc[tokeep] = 1
	return hd, stat

def computeAngularVelocityTuningCurves(spikes, angle, ep, nb_bins = 61, bin_size = 10000, norm=True):
	tmp 			= pd.Series(index = angle.index.values, data = np.unwrap(angle.values))
	tmp2 			= tmp.rolling(window=100,win_type='gaussian',center=True,min_periods=1).mean(std=10.0)
	time_bins		= np.arange(tmp.index[0], tmp.index[-1]+bin_size, bin_size) # assuming microseconds
	index 			= np.digitize(tmp2.index.values, time_bins)
	tmp3 			= tmp2.groupby(index).mean()
	tmp3.index 		= time_bins[np.unique(index)-1]+bin_size/2
	tmp3 			= nap.Tsd(tmp3)
	tmp4			= np.diff(tmp3.values)/np.diff(tmp3.as_units('s').index.values)
	tmp2 			= nap.Tsd(tmp2)
	tmp4			= np.diff(tmp2.values)/np.diff(tmp2.as_units('s').index.values)	
	velocity 		= nap.Tsd(t=tmp2.index.values[1:], d = tmp4)
	velocity 		= velocity.restrict(ep)	
	bins 			= np.linspace(-2*np.pi, 2*np.pi, nb_bins)
	idx 			= bins[0:-1]+np.diff(bins)/2
	velo_curves		= pd.DataFrame(index = idx, columns = list(spikes.keys()))

	for k in spikes:
		spks 		= spikes[k]
		spks 		= spks.restrict(ep)
		speed_spike = velocity.realign(spks)
		spike_count, bin_edges = np.histogram(speed_spike, bins)
		occupancy, _ = np.histogram(velocity.restrict(ep), bins)
		spike_count = spike_count/(occupancy+1)
		velo_curves[k] = spike_count*(1/(bin_size*1e-6))
		# normalizing by firing rate 
		if norm:
			velo_curves[k] = velo_curves[k]/(len(spikes[k].restrict(ep))/ep.tot_length('s'))

	return velo_curves



#seems there are 2 functions with the same name (see above)
def smoothAngularTuningCurves(tuning_curves, window = 20, deviation = 3.0):
    for i in tuning_curves.columns:
        tcurves = tuning_curves[i]
        padded     = pd.Series(index = np.hstack((tcurves.index.values-(2*np.pi),
                                                tcurves.index.values,
                                                tcurves.index.values+(2*np.pi))),
                            data = np.hstack((tcurves.values, tcurves.values, tcurves.values)))
        smoothed = padded.rolling(window=window,win_type='gaussian',center=True,min_periods=1).mean(std=deviation)        
        tuning_curves[i] = smoothed[tcurves.index]

    return tuning_curves
def smoothAngularTuningCurves(tuning_curves, window = 20, deviation = 3.0):
	new_tuning_curves = {}	
	for i in tuning_curves.columns:
		tcurves = tuning_curves[i]
		offset = np.mean(np.diff(tcurves.index.values))
		padded 	= pd.Series(index = np.hstack((tcurves.index.values-(2*np.pi)-offset,
												tcurves.index.values,
												tcurves.index.values+(2*np.pi)+offset)),
							data = np.hstack((tcurves.values, tcurves.values, tcurves.values)))
		smoothed = padded.rolling(window=window,win_type='gaussian',center=True,min_periods=1).mean(std=deviation)		
		new_tuning_curves[i] = smoothed.loc[tcurves.index]

	new_tuning_curves = pd.DataFrame.from_dict(new_tuning_curves)

	return new_tuning_curves



def computeSpeedTuningCurves(spikes, position, ep, bin_size = 0.1, nb_bins = 20, speed_max = 0.4):
	time_bins 	= np.arange(position.index[0], position.index[-1]+bin_size*1e6, bin_size*1e6)
	index 		= np.digitize(position.index.values, time_bins)
	tmp 		= position.groupby(index).mean()
	tmp.index 	= time_bins[np.unique(index)-1]+(bin_size*1e6)/2
	distance	= np.sqrt(np.power(np.diff(tmp['x']), 2) + np.power(np.diff(tmp['z']), 2))
	speed 		= nap.Tsd(t = tmp.index.values[0:-1]+ bin_size/2, d = distance/bin_size)
	speed 		= speed.restrict(ep)
	bins 		= np.linspace(0, speed_max, nb_bins)
	idx 		= bins[0:-1]+np.diff(bins)/2
	speed_curves = pd.DataFrame(index = idx,columns = np.arange(len(spikes)))
	for k in spikes:
		spks 	= spikes[k]
		spks 	= spks.restrict(ep)
		speed_spike = speed.realign(spks)
		spike_count, bin_edges = np.histogram(speed_spike, bins)
		occupancy, _ = np.histogram(speed, bins)
		spike_count = spike_count/(occupancy+1)
		speed_curves[k] = spike_count/bin_size

	return speed_curves


def computeAccelerationTuningCurves(spikes, position, ep, bin_size = 0.1, nb_bins = 40):
	time_bins 	= np.arange(position.index[0], position.index[-1]+bin_size*1e6, bin_size*1e6)
	index 		= np.digitize(position.index.values, time_bins)
	tmp 		= position.groupby(index).mean()
	tmp.index 	= time_bins[np.unique(index)-1]+(bin_size*1e6)/2
	distance	= np.sqrt(np.power(np.diff(tmp['x']), 2) + np.power(np.diff(tmp['z']), 2))
	speed 		= nap.Tsd(t = tmp.index.values[0:-1]+ bin_size/2, d = distance/bin_size)
	speed 		= speed.restrict(ep)
	speed 		= speed.as_series()
	speed2 		= speed.rolling(window=10, win_type='gaussian', center= True, min_periods=1).mean(std = 1.0)
	accel 		= nap.Tsd(t = speed2.index.values[0:-1] + np.diff(speed2.index.values)/2, d = np.diff(speed2.values))	
	bins 		= np.linspace(accel.min(), accel.max(), nb_bins)
	idx 		= bins[0:-1]+np.diff(bins)/2
	accel_curves = pd.DataFrame(index = idx,columns = np.arange(len(spikes)))
	for k in spikes:
		spks 	= spikes[k]
		spks 	= spks.restrict(ep)
		accel_spike = accel.realign(spks)
		spike_count, bin_edges = np.histogram(accel_spike, bins)
		occupancy, _ = np.histogram(accel, bins)
		spike_count = spike_count/(occupancy+1)
		accel_curves[k] = spike_count/bin_size

	return accel_curves


def centerTuningCurves(tcurve):
	"""
	center tuning curves by peak
	"""
    
    #Added by Adrien on Dec 21st 2021
    from pycircstat.descriptive import mean as circmean

	peak = pd.Series(index=tcurve.columns,data = np.array([circmean(tcurve.index.values, tcurve[i].values) for i in tcurve.columns]))
	new_tcurve = []
	for p in tcurve.columns:	
		x = tcurve[p].index.values - tcurve[p].index[tcurve[p].index.get_loc(peak[p], method='nearest')]
		x[x<-np.pi] += 2*np.pi
		x[x>np.pi] -= 2*np.pi
		tmp = pd.Series(index = x, data = tcurve[p].values).sort_index()
		new_tcurve.append(tmp.values)
	new_tcurve = pd.DataFrame(index = np.linspace(-np.pi, np.pi, tcurve.shape[0]+1)[0:-1], data = np.array(new_tcurve).T, columns = tcurve.columns)
	return new_tcurve

# copied to neural_tuning
def offsetTuningCurves(tcurve, diffs):
	"""
	offseting tuning curves synced by diff
	"""	
	new_tcurve 		= []
	for p in tcurve.columns:	
		x = tcurve[p].index.values - tcurve[p].index[tcurve[p].index.get_loc(diffs[p], method='nearest')]
		x[x<-np.pi] += 2*np.pi
		x[x>np.pi] -= 2*np.pi
		tmp = pd.Series(index = x, data = tcurve[p].values).sort_index()
		new_tcurve.append(tmp.values)
	new_tcurve = pd.DataFrame(index = np.linspace(-np.pi, np.pi, tcurve.shape[0]+1)[0:-1], data = np.array(new_tcurve).T, columns = tcurve.columns)
	return new_tcurve


def computeSpeed(position, ep, bin_size = 0.1):
	time_bins 	= np.arange(position.index[0], position.index[-1]+bin_size*1e6, bin_size*1e6)
	index 		= np.digitize(position.index.values, time_bins)
	tmp 		= position.groupby(index).mean()
	tmp.index 	= time_bins[np.unique(index)-1]+(bin_size*1e6)/2
	distance	= np.sqrt(np.power(np.diff(tmp['x']), 2) + np.power(np.diff(tmp['z']), 2))
	speed 		= nts.Tsd(t = tmp.index.values[0:-1]+ bin_size/2, d = distance/bin_size)
	speed 		= speed.restrict(ep)
	return speed

#########################################################
# INTERPOLATION
# ########################################################
def interpolate(z, x, y, inter, bbox = None):	
	import scipy.interpolate
	xnew = np.arange(x.min(), x.max()+inter, inter)
	ynew = np.arange(y.min(), y.max()+inter, inter)
	if bbox == None:
		f = scipy.interpolate.RectBivariateSpline(y, x, z)
	else:
		f = scipy.interpolate.RectBivariateSpline(y, x, z, bbox = bbox)
	znew = f(ynew, xnew)
	return (xnew, ynew, znew)

def filter_(z, n):
	from scipy.ndimage import gaussian_filter	
	return gaussian_filter(z, n)
back to top