Revision ec0d6943e09df2a685f8b1382475c523375352c3 authored by Xuelong Sun on 18 August 2021, 11:22:43 UTC, committed by GitHub on 18 August 2021, 11:22:43 UTC
1 parent 0c88709
insect_brain_model.py
# @File: insect_brain_model.py
# @Info: the modelling of insect brain, eg, the MushroomBody, the Central Complex etc.
# @Author: Xuelong Sun, UoL, UK
# @Time: 2020-02-17
import numpy as np
import random
from scipy.special import expit
# @Author: Xuelong Sun, UoL, UK
# @Time: 2020-08-20
# @Updated: wind direction encoding in Drosophila, Suver et.al 2019
class WindDirectionEncoder:
"""Class for the wind direction encoding ."""
def __init__(self):
self.apn = 0.0
self.b1 = 0.0
self.wpn = 0.0
def upwind_activation(self, current_heading, wind_dir):
theta = (wind_dir + np.pi - current_heading + np.pi) % (2.0 * np.pi) - np.pi
self.apn = np.sin(theta)
self.b1 = np.sin(-theta)
self.wpn = self.apn - self.b1
return self.wpn
class MushroomBodyModel(object):
"""Class for the MB of visual novelty learning ."""
def __init__(self, learning_rate, tau, num_pn=81, num_kc=4000, num_en=1):
# learning para
self.learning_rate = learning_rate
self.tau = tau
# network structure para
self.num_pn_per_kc = 10
self.num_stimuli = 80
self.num_pn = num_pn
self.num_kc = num_kc
self.num_en = num_en
# activation functions
self.af_pn = lambda x: np.maximum(x, 0)
self.af_kc = lambda x: np.float32(x > self.tau)
self.af_en = lambda x: np.maximum(x, 0)
# initializtion
self.stimuli = np.zeros(self.num_stimuli)
self.r_pn = np.zeros(self.num_pn)
self.r_kc = np.zeros(self.num_kc)
self.r_en = np.zeros(self.num_en)
# weights
self.w_pn2kc = generate_pn2kc_weights(self.num_pn,self.num_kc)
self.w_kc2en = np.ones([self.num_kc, self.num_en])
# learning control
self.reward = False
def generate_w_pn2kc(self):
w = np.zeros([self.num_pn, self.num_kc])
for i in range(self.num_kc):
random_index = random.sample(range(self.num_pn), self.num_pn_per_kc)
w[random_index, i] = 1.0
return w
def run(self, stimuli):
self.stimuli = stimuli
# PN directly receive the stimuli's input
self.r_pn = self.af_pn(self.stimuli)
# KC receive randomsly selected 'self.num_pn_per_kc' PNs input
self.r_kc = self.af_kc(self.r_pn.dot(self.w_pn2kc))
# EN summed all KCs's activation via the weights
self.r_en = self.af_en(self.r_kc.dot(self.w_kc2en))
if self.reward:
self.learning(self.r_kc)
return self.r_en
def learning(self, kc):
"""
THE LEARNING RULE:
----------------------------
KC | KC2EN(t)| KC2EN(t+1)
______|_________|___________
1 | 1 |=> 0
1 | 0 |=> 0
0 | 1 |=> 1
0 | 0 |=> 0
"""
temp = (kc >= self.w_kc2en[:, 0]).astype(bool)
self.w_kc2en[:, 0][temp] = np.maximum(self.w_kc2en[:, 0][temp] - self.learning_rate, 0)
def reset(self):
# initialization
self.stimuli = np.zeros(self.num_stimuli)
self.r_pn = np.zeros(self.num_pn)
self.r_kc = np.zeros(self.num_kc)
self.r_en = np.zeros(self.num_en)
# weights
self.w_pn2kc = self.generate_w_pn2kc()
self.w_kc2en = np.ones(self.num_kc, self.num_en)
# learning control
self.reward = False
class SuperiorMedialProtocerebrumModel(object):
"""Class for the modelling of the SMP neurons (TUN, SN1, SN2)
"""
def __init__(self, tun_k, sn_thr):
self.tun = 0.0
self.tun_k = tun_k
self.sn1 = 0.0
self.sn2 = 0.0
self.sn_thr = sn_thr
def neurons_output(self, mb_output):
# linear activation function of TUN
self.tun = np.min([mb_output * self.tun_k, 1])
# binary neurons (SN1, SN2)
self.sn2 = 1.0 if mb_output > self.sn_thr else 0.0
self.sn1 = 1.0 if self.sn2 == 0.0 else 0.0
return self.tun, self.sn1, self.sn2
class AOTuVisualPathwayModel(object):
"""
class for implementation the visual pathway: OL -> AOTU -> BU -> EB
Route following network generating RF desired heading
- artificial neural network with one hidden layer
"""
def __init__(self, x, y, num_neuron):
self.num_neurons = num_neuron
# initialize the network
self.input = x
self.y = y
self.sample_num = x.shape[0]
self.weight1 = np.random.rand(self.input.shape[1], self.num_neurons)
self.weight2 = np.random.rand(self.num_neurons, y.shape[1])
self.layer1_z = np.zeros([self.sample_num, self.num_neurons])
self.layer1_a = np.zeros([self.sample_num, self.num_neurons])
self.layer2_z = np.zeros(y.shape)
self.layer2_a = np.zeros(y.shape)
self.output = np.zeros(y.shape)
self.error = []
self.learning_rate = []
def forward_propagation(self):
self.layer1_z = np.dot(self.input, self.weight1)
self.layer1_a = sigmoid(self.layer1_z)
self.layer2_z = np.dot(self.layer1_a, self.weight2)
self.layer2_a = sigmoid(self.layer2_z)
self.output = self.layer2_a
def back_propagation(self, learning_rate=1.0):
delta = 2.0 * (self.y - self.output) * self.layer2_a * (1 - self.layer2_a)
delta_weight2 = np.dot(self.layer1_a.T, delta)
delta_weight1 = np.dot(self.input.T, np.dot(delta, self.weight2.T) * self.layer1_a * (1 - self.layer1_a))
self.weight1 += delta_weight1 * learning_rate
self.weight2 += delta_weight2 * learning_rate
def nn_output(self, x):
layer1 = sigmoid(np.dot(x, self.weight1))
out = sigmoid(np.dot(layer1, self.weight2))
# reverse the direction (+pi)
return np.roll(out, 4)
class CentralComplexModel(object):
"""Class for the CX of current heading (PB), cue integration (FB) and steering circuit (FB).
This implementation is adapted from Stone et.al 2017, https://doi.org/10.1016/j.cub.2017.08.052
"""
def __init__(self):
# global current heading
# tl2 neurons
self.tl2_prefs = np.tile(np.linspace(0, 2 * np.pi, 8, endpoint=False), 2)
self.tl2 = np.zeros(8)
# cl1 neurons
self.cl1 = np.zeros(8)
# I_tb1 neurons
self.I_tb1 = np.zeros(8)
# connection weights
# cl1 -> I_tb1
self.W_CL1_TB1 = np.tile(np.eye(8), 2)
# I_tb1 -> I_tb1
self.W_TB1_TB1 = gen_tb_tb_weights()
# local current heading
self.phase_prefs = np.linspace(0, 2 * np.pi, 8, endpoint=False)
# local compass neuron - II_tb1
self.II_tb1 = np.zeros(8)
# steering circuit
self.desired_heading_memory = np.zeros(16)
self.current_heading_memory = np.zeros(8)
# pre-motor neuron CPU1
self.cpu1a = np.zeros(14)
self.cpu1b = np.zeros(2)
self.cpu1 = np.zeros(16)
# motor[0] for left and motor[1] for right
self.motor = np.zeros(2)
# positive -> turn left, negative -> turn right
self.motor_value = 0
# connection weights from CPU1 to motor
self.W_CPU1a_motor = np.array([
[1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1]])
self.W_CPU1b_motor = np.array([[0, 1],
[1, 0]])
# connection weights to steering circuit
# current heading -> steering circuit
self.W_CH_CPU1a = np.tile(np.eye(8), (2, 1))[1:14 + 1, :]
self.W_CH_CPU1b = np.array([[0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0]])
# desired heading -> steering circuit
self.W_DH_CPU1a = np.array([
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
])
self.W_DH_CPU1b = np.array([
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 8
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1], # 9
])
# other para
self.noise = 0.0
def global_current_heading(self, theta):
# tl2
output = np.cos(theta - self.tl2_prefs)
self.tl2 = noisy_sigmoid(output, 6.8, 3.0, self.noise)
# cl1
self.cl1 = noisy_sigmoid(-self.tl2, 3.0, -0.5, self.noise)
# I_tb1
prop_cl1 = 0.667 # Proportion of input from CL1 vs TB1
prop_I_tb1 = 1.0 - prop_cl1
output = (prop_cl1 * np.dot(self.W_CL1_TB1, self.cl1) -
prop_I_tb1 * np.dot(self.W_TB1_TB1, self.I_tb1))
self.I_tb1 = noisy_sigmoid(output, 5.0, 0.0, self.noise)
return self.I_tb1
def local_current_heading(self, phase):
vc_mem_ring = np.cos(np.deg2rad(phase) - self.phase_prefs)
self.II_tb1 = 1 / (1 + np.exp(-vc_mem_ring * 3 - 1.0))
return self.II_tb1
def steering_circuit_out(self):
inputs = np.dot(self.W_DH_CPU1a, self.desired_heading_memory) * np.dot(self.W_CH_CPU1a,
1.0 - self.current_heading_memory)
self.cpu1a = noisy_sigmoid(inputs, 5.0, 2.5, self.noise)
inputs = np.dot(self.W_DH_CPU1b, self.desired_heading_memory) * np.dot(self.W_CH_CPU1b,
1.0 - self.current_heading_memory)
self.cpu1b = noisy_sigmoid(inputs, 5.0, 2.5, self.noise)
self.cpu1 = np.hstack([self.cpu1b[-1], self.cpu1a, self.cpu1b[0]])
motor = np.dot(self.W_CPU1a_motor, self.cpu1a)
motor += np.dot(self.W_CPU1b_motor, self.cpu1b)
self.motor = motor
self.motor_value = (self.motor[0] - self.motor[1]) * 0.25
return self.motor_value
class RingAttractorModel(object):
"""
class for the implementation of ring attractor network for cue integration
"""
def __init__(self):
# time to get stable state
self.time = 0.1 #s
self.dt = 1e-4
self.ti = 0.001 #s
self.nt = np.floor(self.time/self.dt)
self.ni = np.floor(self.ti/self.dt)
self.num_neuron = 8
self.neuron_pref = np.linspace(0, 360 - 360 / self.num_neuron, self.num_neuron).reshape(self.num_neuron, 1)
# neuron connections
self.W_E_E_K = 45 / self.num_neuron
self.W_E_E = self._generate_w_e2e()
self.W_I_E = 60 / self.num_neuron
self.W_E_I = -6.0
self.W_I_I = -1.0
# other parameters
self.gammaE = -1.5
self.gammaI = -7.5
self.tauE = 0.005
self.tauI = 0.00025
# neuron activations
self.integration_neuron = np.zeros(16)
self.inhibition_neuron = np.zeros(2)
def _generate_w_e2e(self):
wEE = np.zeros((self.num_neuron, self.num_neuron))
sigma = 130
for i in range(0, self.num_neuron):
for j in range(0, self.num_neuron):
diff = np.min([np.abs(self.neuron_pref[i] - self.neuron_pref[j]),
360 - np.abs(self.neuron_pref[i] - self.neuron_pref[j])])
wEE[i, j] = np.exp((-diff ** 2) / (2 * sigma ** 2))
return wEE * self.W_E_E_K
def cue_integration_output(self, cue1, cue2):
"""
update the ring attractor with cue1 and cue2 injected and return the stable state of the network as the
integrated output
:param cue1: the activation of cue1 - an array with size 16x1
:param cue2: the activation of cue1 - an array with size 16x1
:return: the optimal integration of cue1 and cue2, an array with the same size as cue1 and cue2
"""
pi_left = cue1[0:8]
pi_l = np.zeros((self.num_neuron, int(self.nt)))
pi_l[:, int(self.ni):] = np.repeat(pi_left.reshape(self.num_neuron, 1), int(self.nt - self.ni), axis=1)
pi_right = cue1[8:]
pi_r = np.zeros((self.num_neuron, int(self.nt)))
pi_r[:, int(self.ni):] = np.repeat(pi_right.reshape(self.num_neuron, 1), int(self.nt - self.ni), axis=1)
v_left = cue2[0:8]
v_l = np.zeros((self.num_neuron, int(self.nt)))
v_l[:, int(self.ni):] = np.repeat(v_left.reshape(self.num_neuron, 1), int(self.nt - self.ni), axis=1)
v_right = cue2[8:]
v_r = np.zeros((self.num_neuron, int(self.nt)))
v_r[:, int(self.ni):] = np.repeat(v_right.reshape(self.num_neuron, 1), int(self.nt - self.ni), axis=1)
# generate the array for integration cells and the uniform inhibitory cell
it_l = np.zeros((self.num_neuron, int(self.nt)))
it_l[:, 0] = 0.1 * np.ones(self.num_neuron, )
it_r = np.zeros((self.num_neuron, int(self.nt)))
it_r[:, 0] = 0.1 * np.ones(self.num_neuron, )
ul = np.zeros((1, int(self.nt)))
ur = np.zeros((1, int(self.nt)))
# iteration to the stable state
for t in range(1, int(self.nt)):
it_l[:, t] = it_l[:, t - 1] + (-it_l[:, t - 1] + np.max(
[np.zeros((self.num_neuron,)),
self.gammaE + np.dot(self.W_E_E, it_l[:, t - 1]) + self.W_E_I * ul[:, t - 1] + pi_l[:, t - 1]
+ v_l[:, t - 1]], axis=0)) * self.dt / self.tauE
ul[:, t] = ul[:, t - 1] + (
-ul[:, t - 1] + np.max([0, self.gammaI + self.W_I_E * np.sum(it_l[:, t - 1]) + self.W_I_I * ul[:, t - 1]],
axis=0)) * self.dt / self.tauI
it_r[:, t] = it_r[:, t - 1] + (-it_r[:, t - 1] + np.max(
[np.zeros((self.num_neuron,)),
self.gammaE + np.dot(self.W_E_E, it_r[:, t - 1]) + self.W_E_I * ur[:, t - 1] + pi_r[:, t - 1]
+ v_r[:, t - 1]], axis=0)) * self.dt / self.tauE
ur[:, t] = ur[:, t - 1] + (
-ur[:, t - 1] + np.max([0, self.gammaI + self.W_I_E * np.sum(it_r[:, t - 1]) + self.W_I_I * ur[:, t - 1]],
axis=0)) * self.dt / self.tauI
# get the final iteration as the output
# update the neuron activation
self.integration_neuron = np.hstack([it_l[:, -1], it_r[:, -1]])
self.integration_neuron = noisy_sigmoid(self.integration_neuron, 5.0, 2.5, 0)
self.inhibition_neuron = np.hstack([ul[-1], ur[-1]])
return self.integration_neuron
def generate_pn2kc_weights(nb_pn, nb_kc, min_pn=10, max_pn=20, aff_pn2kc=None, nb_trials=100000, baseline=25000,
rnd=np.random.RandomState(2018), dtype=np.float32):
"""
Create the synaptic weights among the Projection Neurons (PNs) and the Kenyon Cells (KCs).
Choose the first sample that has dispersion below the baseline (early stopping), or the
one with the lower dispersion (in case non of the samples' dispersion is less than the
baseline).
:param nb_pn: the number of the Projection Neurons (PNs)
:param nb_kc: the number of the Kenyon Cells (KCs)
:param min_pn:
:param max_pn:
:param aff_pn2kc: the number of the PNs connected to every KC (usually 28-34)
if the number is less than or equal to zero it creates random values
for each KC in range [28, 34]
:param nb_trials: the number of trials in order to find a acceptable sample
:param baseline: distance between max-min number of projections per PN
:param rnd:
:type rnd: np.random.RandomState
:param dtype:
"""
dispersion = np.zeros(nb_trials)
best_pn2kc = None
for trial in range(nb_trials):
pn2kc = np.zeros((nb_pn, nb_kc), dtype=dtype)
if aff_pn2kc is None or aff_pn2kc <= 0:
vaff_pn2kc = rnd.randint(min_pn, max_pn + 1, size=nb_pn)
else:
vaff_pn2kc = np.ones(nb_pn) * aff_pn2kc
# go through every kenyon cell and select a nb_pn PNs to make them afferent
for i in range(nb_pn):
pn_selector = rnd.permutation(nb_kc)
pn2kc[i, pn_selector[:vaff_pn2kc[i]]] = 1
# This selections mechanism can be used to restrict the distribution of random connections
# compute the sum of the elements in each row giving the number of KCs each PN projects to.
pn2kc_sum = pn2kc.sum(axis=0)
dispersion[trial] = pn2kc_sum.max() - pn2kc_sum.min()
# pn_mean = pn2kc_sum.mean()
# Check if the number of projections per PN is balanced (min max less than baseline)
# if the dispersion is below the baseline accept the sample
if dispersion[trial] <= baseline: return pn2kc
# cache the pn2kc with the least dispersion
if best_pn2kc is None or dispersion[trial] < dispersion[:trial].min():
best_pn2kc = pn2kc
# if non of the samples have dispersion lower than the baseline,
# return the less dispersed one
return best_pn2kc
def gen_tb_tb_weights(weight=1.):
"""Weight matrix to map inhibitory connections from TB1 to other neurons"""
W = np.zeros([8, 8])
sinusoid = -(np.cos(np.linspace(0, 2 * np.pi, 8, endpoint=False)) - 1) / 2
for i in range(8):
values = np.roll(sinusoid, i)
W[i, :] = values
return weight * W
def noisy_sigmoid(v, slope=1.0, bias=0.5, noise=0.01):
"""Takes a vector v as input, puts through sigmoid and
adds Gaussian noise. Results are clipped to return rate
between 0 and 1"""
sig = expit(v * slope - bias)
if noise > 0:
sig += np.random.normal(scale=noise, size=len(v))
return np.clip(sig, 0, 1)
def sigmoid(x, deriv=False):
if (deriv == True):
return x * (1 - x)
return 1 / (1 + np.exp(-x))
Computing file changes ...