https://github.com/opnumten/trajectory_analysis
Tip revision: 1153076f88acab6f1030eefa119f4e6bd49cbb97 authored by opnumten on 10 January 2022, 07:34:23 UTC
segmentation_correction_tracking
segmentation_correction_tracking
Tip revision: 1153076
reactive_traj_classify.py
import numpy as np
import os
from os import listdir
import pandas as pd
from scipy.stats import kde
import seaborn as sns
import copy
from math import exp,log
import pickle
import scipy.ndimage as ndimage
import scipy.interpolate.fitpack as fitpack
from sklearn import manifold,decomposition,random_projection,cluster,metrics,preprocessing,mixture,model_selection
from sklearn.preprocessing import StandardScaler,MinMaxScaler,RobustScaler
import scipy.io as sio
from statsmodels.tsa.ar_model import AR
from scipy import signal
def consecutive_arrs(data, stepsize=3):
return np.split(data, np.where(np.diff(data)>stepsize)[0]+1)
def autocorr(x, range=144,step=1):
return np.array([1]+[np.corrcoef(x[:-i], x[i:])[0,1] for i in np.arange(1, range,step)])
def find_reaction_start_end(traj_state,dwell_thres=6):
reaction_start=traj_state.shape[0]
reaction_end=0
enter_M=0
E_arrs=consecutive_arrs(np.where(traj_state==0)[0])
M_arrs=consecutive_arrs(np.where(traj_state==2)[0])
for m in range(len(E_arrs)):
if E_arrs[m].shape[0]>dwell_thres:
reaction_start=E_arrs[m][0]
break
n=len(M_arrs)
while n>=1:
if M_arrs[n-1].shape[0]>dwell_thres:
reaction_end=M_arrs[n-1][-1]
break
n-=1
for k in range(len(M_arrs)):
if M_arrs[k].shape[0]>dwell_thres:
enter_M=M_arrs[k][0]
break
return reaction_start,reaction_end,enter_M
def autocorr(x, lag_range,step=1):
return np.array([1]+[np.corrcoef(x[:-i], x[i:])[0,1] for i in np.arange(1,lag_range,step)])
def sliding_window_ar1_coef(x,window_size,t_range,step=1):
return np.array([(AR(x[t:t+window_size]).fit(1)).params[1] for t in np.arange(0,t_range,step)])
def ar1_tipping_time(reaction_traj,window_size_p=4,t_range_p=2):
ar1_t=[]
window_size=reaction_traj.shape[0]//window_size_p
t_range=reaction_traj.shape[0]//t_range_p
for j in range(reaction_traj.shape[1]):
traj_ar1=sliding_window_ar1_coef(reaction_traj[:,j], window_size,t_range)
peaks, _ = signal.find_peaks(traj_ar1)
max_peak=peaks[np.argmax(traj_ar1[peaks])]
ar1_t.append(max_peak+window_size)
# ar1_t.append(np.argmax(traj_ar1)+window_size)
return ar1_t
def cross_corr_delay(reaction_traj,d1=0,d2=1):
n=reaction_traj.shape[0]
y1=reaction_traj[:,d1]
y2=reaction_traj[:,d2]
corr0 = signal.correlate(y1, y2, mode='full') / np.sqrt(np.correlate(y1, y1)*np.correlate(y2, y2))
corr_range=n//2
corr=corr0[(n-corr_range)+1:corr_range+n]
# corr=abs(corr)
if np.argmax(corr)>=corr_range:
print('vim first',np.argmax(corr)-corr_range)
cc_lag=np.argmax(corr)-corr_range
max_corr=corr[np.argmax(corr)]
else:
print('morph first',-(corr_range-np.argmax(corr)))
cc_lag=-(corr_range-np.argmax(corr))
max_corr=corr[np.argmax(corr)]
return cc_lag,max_corr
def find_intermediate_part(ls_score):
trans_state=ls_score
reactive_inds=[]
P_arrs=consecutive_arrs(np.where(trans_state==1)[0])
for m in range(len(P_arrs)):
if P_arrs[m].shape[0]>0:
if P_arrs[m][-1]<(ls_score.shape[0]-1) and P_arrs[m][0]>0:
if trans_state[P_arrs[m][-1]+1]==2 and trans_state[P_arrs[m][0]-1]==0:
reactive_inds.append(P_arrs[m])
return reactive_inds
# def cluster_points(X, mu):
# cluster_labels=np.zeros((X.shape[0]))
# for x in X:
# bestmukey = min([(i[0], np.linalg.norm(x-mu[i[0]])) for i in enumerate(mu)], key=lambda t:t[1])[0]
# cluster_labels[np.where(X==x)[0]]=bestmukey
# return cluster_labels
# def sliding_window_ar1(x,window_size,step=1):
# return np.array([np.corrcoef(x[t:t+window_size], x[t+1:t+1+window_size])[0,1] for t in np.arange(0,x.shape[0]-window_size-1,step)])
# def sliding_window_std(x,window_size,step=1):
# return np.array([np.std(x[t:t+window_size]) for t in np.arange(0,x.shape[0]-window_size,step)])