Raw File
ctrnn.py
#!/usr/bin/env python2.4
# *-* encoding: utf8
# 
# Copyright (c) 2006 Stian Soiland
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Author: Stian Soiland <stian@soiland.no>
# URL: http://soiland.no/i/src/
# License: MIT
#
"""Continuous time recurrent neural network.

"""


from math import exp, sqrt, pi, tanh
import operator
import logging

try:
    import numpy
except ImportError:    
    # Try to go old-fashioned
    import Numeric as numpy


# Hackish, generate IEEE 754 special values
#inf = 1e300 * 1e300
#nan = inf - inf

def identity(x):
    """Identity transfer function"""
    return x

def step(x, limit=0.4):
    """Step transfer function"""
    if x < limit:
        return 0.0
    return 1.0

def sigmoid(x, gain=1.0, bias=0.0):
    """Sigmoidal transfer function."""
    try:
        return 1.0 / (1.0 + exp(-gain * (x-bias)))
    except OverflowError:
        return 0.0

def dsigmoid(x):
    """Derivative of sigmoid()"""
    f = sigmoid(x)
    return f * (1-f)

def gaussian(x, mu=0.0, sigma=1.0):
    """Gaussian transfer function"""
    return  (1 / (sqrt(2*pi) * sigma)) * exp( -0.5 * ( (x-mu)/sigma ) ** 2 )

def dgaussian(x, *args, **kwargs):
    """Derivative of gaussian(), parameters as for gaussian()"""
    return -2 * gaussian(x, *args, **kwargs) * x

def signum(x):
    if x < 0.0:
        return -1.0
    return 1.0

# The same as math.tanh
#def tansig(x):
#    return ( 2/ (1+exp(-2*x)) ) -1

def dtansig(x):
    """Derivative of tanh()"""
    f = tanh(x)
    return 1-(f**2)

def tanh_pos(x):
    """tanh with lower limit of 0.0"""
    if x < 0.0:
        return 0.0
    return tanh(x)


class Layers(object):
    """Layers for constructing and accessing a CTRNN.
    
    A layer is a set of neurons. A network is made of several layers.
    Note that compared to feed-forward networks, there are no rules that
    neurons in a layer cannot be interconnected. In addition,
    timestep calculation is done globally, not per layer. 
   
    These layers can be used to group neurons into different layers for
    easier access to their parameters. For instance, a network can
    consist of an input layer, a hidden layer, and an output layer,
    which can have different weights and time constants.  Instead of
    remembering that the hidden neurons are in the range 15 to 26 and
    using these offsets in all code, the layer can be accessed as the
    attribute "hidden".

    Construction work by adding layers using add_layers(). When all the
    layers are ready, build_net() is called. After this, no more layers
    can be added, the complete CTRNN network is available as attribute
    .net, and the different layers as attributes by .their_name.

    Example:
        layers = Layers()
        layers.add_input_layer("input", 3)
        layers.add_layer("hidden", 5)
        layers.add_layer("output", 2)
        layers.build_net()
        assert len(layers.net.potential) == 3+5+2
         
        layers.input.set_inputs((10, -5, 15))
        layers.hidden.timeconst[3] = 5
        layers.hidden.weight[0, 0] = 13
    """
    
    def __init__(self):
        self.layers = []
        self.input_layers = set()
        self.neurons = 0
        self.net = None

    def add_input_layer(self, name, neurons):
        """Add the input layer with the given name.

        Like add_layer(), but input layers will have their transfer
        function set to identity(), and their timeconstant will be 1. 
        """
        self.add_layer(name, neurons)
        self.input_layers.add(name)

    def add_layer(self, name, neurons):
        if self.net:
            raise "Cannot add layers after build_net()"
        if hasattr(self, name):    
            raise AttributeError, "already exists: %s" % name    
        # Placeholder until build_net   
        setattr(self, name, None)    
        min = self.neurons  # Inclusive
        self.neurons += neurons
        max = min + neurons # exclusive 
        self.layers.append((name, slice(min,max)))

    def build_net(self, timeconst=1.0):
        self.net = CTRNN(self.neurons, timeconst)
        for (name, slice) in self.layers:
            if name in self.input_layers:
                layer = _InputLayer(self.net, slice)
            else:
                layer = _Layer(self.net, slice)
            setattr(self, name, layer)


class _Layer(object):
    """Proxy access to network parameters for the given layer.

    Properties such as bias, output, potential and timeconst are sliced
    out to refer to the current Layer only. Note that due to the use of
    numpy.array, these slices are also assignable, and changes are
    reflected in the grand network.

    The property ''weight'' is the part of the weight matrix for
    connections going *to* this layer, from *all* neurons. As thus, the
    matrix is sized with n rows and m columns, where n is the number of
    neurons in the whole network, and m is the number of neurons in this
    layer. 

    Examples:

        # Set the weight from network neuron number 14 (global indexes)
        # to neuron 2 in this layer.
        layer.weight[14,3] = 0.7

        # Get global index for usage in another layer.weight
        index = layer[2] 

        # Check the output of the layer
        print layer.output

        # Set all the biases at once (assumed layer size 4)
        layer.bias[:] = [0,1,2,3]

    """
    def __init__(self, net, slice):
        self.net = net
        self.slice = slice
    
    def __len__(self):
        step = self.slice.step or 1
        return (self.slice.stop - self.slice.start) / step
    
    def __getitem__(self, item):
        step = self.slice.step or 1
        index = item * step
        if index < 0:
            index = self.slice.stop + index
        else:    
            index = self.slice.start + index
        if index < self.slice.start:
            raise IndexError, item
        if index >= self.slice.stop:
            raise IndexError, item
        return index    
    
    def calc_timestep(self):
        self.net.calc_timestep(self.slice)
    
    def _slicer(self, array):
        # Note: This will only work for assignments if array is of
        # numpy.arraytype, where slices work as pointers instead of
        # making copied arrays
        assert isinstance(array, numpy.arraytype)
        return array[self.slice]    

    @property
    def bias(self):
        return self._slicer(self.net.bias)
               
    @property
    def output(self):
        return self._slicer(self.net.output)
            
    @property
    def potential(self):
        return self._slicer(self.net.potential)
        
    @property
    def timeconst(self):
        return self._slicer(self.net.timeconst)

    @property
    def transfer(self):
        return self._slicer(self.net.transfer)
    
    def set_transfer(self, transfer):
        """Set the transfer function for the whole layer.
        """
        self.transfer[:] = [transfer] * len(self)

    @property
    def weight(self):
        # We return the weights pointing TO this neuron, 
        # ie. where second dimension is our slice
        return self.net.weight[:,self.slice]

class _InputLayer(_Layer):
    def __init__(self, net, slice):
        super(_InputLayer, self).__init__(net, slice)
        self.fix()
    
    # FIXME: Separate the input layer from the "real" network
    def fix(self):
        """Make sure there is no monkey business going on"""
        self.set_transfer(identity)
        self.timeconst[:] = [1.0] * len(self)
        self.weight[:] = [0.0] * len(self.weight)

    def set_inputs(self, inputs):
        self.bias[:] = inputs
        self.calc_timestep()
        # If this fails, you have messed up the weights, timeconstants
        # or transfers of this input layer.
        assert (self.output == inputs).all()


class CTRNN(object):
    """Continuous time recurrent neural network.
    """

    def __init__(self, neurons, timeconst=1.0, transfer=sigmoid,
                 timestep=1.0):
        """Construct a new CTRNN of the given number of neurons.

        Optional parameter timeconst is the time constant for neurons,
        by default 1.0.  Individual time constants can later be modified
        through self.timeconst.

        Optional parameter transfer is the default transfer function.
        Transfer functions can be set individually for each neuron by
        the list self.transfer.

        Optional parameter timestep is the size of the timestep 
        (delta T), by default 1. Note that for the network to be stable,
        the timestep size must be less than double the smallest
        timeconstant in the network, ie. if the smallest timeconstant is
        2, the timestep must be less than 4.
        """
        # Prepare logging
        self.log = logging.getLogger("ctrnn")
        if not self.log.handlers or logging.root.handlers:
            logging.basicConfig()

        self.num_neurons = neurons

        # NOTE: All arrays are numpy.array-s - which enables
        # by-reference slicing

        # internal state, membrane potential
        self.potential = numpy.zeros(neurons, numpy.Float)
        # Our timestep \Delta t must be smaller than twice the smallest
        # timeconst
        self.timestep = timestep
        assert 2*timeconst > timestep
        # By default, no bias
        self.bias = numpy.zeros(neurons, numpy.Float)
        self.timeconst = numpy.array([float(timeconst)] * neurons)
        self.weight = numpy.array(numpy.zeros((neurons, neurons)),  
                                    numpy.Float)
        self.transfer = numpy.array([transfer] * neurons)
        # Output values as calulated by calc_timestep()    
        self.output = numpy.zeros(neurons, numpy.Float)
    
    def connect_all(self, weight=None, func=None, ref_self=False):
        """Connect all neurons using the specified weight or function.

        Either weight or func must be specified, but not both.

        If weight is specified, it is the scalar weight assigned to all
        connections.

        If func is specified, it is assumed to be a function taking no
        arguments. The function will be called for each connection, 
        and the result is assigned to the connection. No order can be
        assumed, so this function is normally something like
        random.random.

        If ref_self is set to True, self-looping weights will also be
        assigned between neuron n and neuron n, otherwise they will be
        assigned 0.0.
        """
        assert ((weight is None and func) or 
                (weight is not None and not func))
        for n in range(self.num_neurons):
            for m in range(self.num_neurons):
                if n == m and not ref_self:
                    w = 0.0
                elif func:
                    w = func()    
                else:
                    w = weight    
                self.weight[n,m] = w

    def set_transfer(self, transfer):
        """Set the transfer function for the whole network.
        """
        self.transfer[:] = [transfer] * len(self.transfer)
    
    def calc_timestep(self, slicing=None):
        """Calculate the next timestep.

        If slicing is given, it is assumed a slice object from which
        neurons are to be updated. Otherwise, all neurons are updated.
        """
        # We do this as nice matrix operations
        if not slicing:
            # ie. [:] - everything  FIXME: Undocumented in Python
            slicing = slice(None)
        inputs = numpy.matrixmultiply(self.output,
                                        self.weight[:,slicing]) 
        change = self.timestep/self.timeconst[slicing] * (-self.potential[slicing] + 
                                                 inputs + self.bias[slicing])
        self.potential[slicing] += change
        self.output[slicing] = [f(x) for f,x in 
                zip(self.transfer[slicing], self.potential[slicing])]
    
    def stabilize(self, max_steps=200, precision=None):
        """Run the network until it stabilizes.

        A network is considered stable if the output does not change
        from one timestep to the next. (Note that depending on the
        transfer function, potentials can still change by this
        definition)

        If parameter max_steps is supplied, it is the maximum number of
        timesteps to run, by default 200.

        If parameter precision is supplied, it specifies the maximum
        difference between the highest and lowest output for the network
        to be considered stabilized. By default, full stabilizing
        would apply, requiring IEE754 double precision substraction to
        return 0.0

        Return the number of time steps used to stabilize, or None
        if the network did not stabilize within the maxiumum steps.
        """
        
        for x in xrange(max_steps):
            # Make a copy that is not changed by calc_timestep
            prev = list(self.output)
            self.calc_timestep() 
            diff = numpy.subtract(prev, self.output)
            if precision is None:
                # All must be 0
                if not numpy.sometrue(diff):
                    self.log.info("Stabilized in %s timesteps", x)
                    return x
            else:
                # The maximal difference must be less than precision
                span = numpy.absolute(diff).max()
                if span <= precision:
                    return x

        
        return None
    
back to top