Raw File
gradkern.py
import numpy as np
import tensorflow as tf
import GPflow
from GPflow.param import Param
from GPflow import transforms
import pdb

import matplotlib.pyplot as plt

from functools import partial

from multikernel import BlockKernel

from GPflow._settings import settings
float_type = settings.dtypes.float_type


def find_partitions(collection):
    '''Take an iterable `collection` and return a generator which produces
    all unique partitions as lists-of-lists with every object being a member of
    one set per partition.
    alexis @ http://stackoverflow.com/questions/19368375/set-partitions-in-python
    '''
    if len(collection) == 1:
        yield [ collection ]
        return

    first = collection[0]
    for smaller in find_partitions(collection[1:]):
        # insert `first` in each of the subpartition's subsets
        for n, subset in enumerate(smaller):
            yield smaller[:n] + [[ first ] + subset]  + smaller[n+1:]
        # put `first` in its own subset
        yield [ [ first ] ] + smaller

class StationaryGradKernel(BlockKernel):
    ''' Abstract kernel class which allows arbitrary derivative observations for
    any differentiable stationary kernel. Computes each kernel matrix as a
    composition of block matrices and automatically generates appropriate kernel
    functions if given derivatives of the kernel matrix wrt squared distance.

    To extend, inherit and replace `derivative_base` attribute with list of
    the 0th, 1st, and 2nd derivative of desired kernel k(tau) with respect to squared
    distance tau. Defaults to RBF/squared exponential.
    '''
    def __init__(self, input_dim, active_dims = None):
        groups = input_dim
        BlockKernel.__init__(self, input_dim, groups, active_dims)
        self.variance = GPflow.param.Param(0.1, transforms.positive)
        self.lengthscales = GPflow.param.Param(1.*np.ones(input_dim-1), transforms.positive)

        self.derivative_base = [lambda tau: self.variance*tf.exp(-0.5*tau), lambda tau: -0.5*self.variance*tf.exp(-0.5*tau), lambda tau: 0.25*self.variance*tf.exp(-0.5*tau)]
        self.kerns = [[self._kernel_factory(i,j) for j in range(groups)] for i in range(groups)]

    def subK(self, index, X, X2 = None):
        i, j = index
        if X2 is None:
            X2 = X
        return self.kerns[i][j](X, X2)

    def subKdiag(self, index, X):
        K  = self.subK((index, index), X, X)
        return tf.diag_part(K)

    def square_dist(self, X, X2):
        '''Function `squared_dist` identical to method in `Stationary`.'''
        X = X / self.lengthscales
        Xs = tf.reduce_sum(tf.square(X), 1)
        if X2 is None:
            return -2 * tf.matmul(X, tf.transpose(X)) + \
                   tf.reshape(Xs, (-1, 1)) + tf.reshape(Xs, (1, -1))
        else:
            X2 = X2 / self.lengthscales
            X2s = tf.reduce_sum(tf.square(X2), 1)
            return -2 * tf.matmul(X, tf.transpose(X2)) + \
                   tf.reshape(Xs, (-1, 1)) + tf.reshape(X2s, (1, -1))

    def square_dist_d(self, X, X2, diff_x=(), diff_y=()):
        '''Take derivative of square_dist(x,y) wrt dimensions `diff_x` of first
        argument and `diff_y` of second argument.

        Parameters:
        - `X` and `X2`: arguments for `square_dist`.
        - `diff_x`: Tuple of integers. Dimensions to differentiate with respect
            to in the first argument of `square_dist(x,y)`.
        - `diff_y`: Tuple of integers. Dimensions to differentiate with respect
            to in the second argument of `square_dist(x,y)`.

        Return:
        - Appropriate derivative as a Tensorflow Tensor.
        '''
        x_order = len(diff_x)
        y_order = len(diff_y)
        diff_order = x_order + y_order
        dx = diff_x + diff_y

        Zero =  tf.zeros((tf.shape(X)[0], tf.shape(X)[0] if X2 is None else tf.shape(X2)[0]), dtype=float_type)
        One = tf.ones((tf.shape(X)[0], tf.shape(X)[0] if X2 is None else tf.shape(X2)[0]), dtype=float_type)

        if diff_order > 2:
            #higher derivatives vanish
            return Zero
        elif diff_order == 0:
            #if we don't take the derivative, return the function
            return self.square_dist(X, X2)
        elif diff_order == 1:

            X = X / self.lengthscales ** 2
            if X2 is None:
                X2 = X
            else:
                X2 = X2 / self.lengthscales ** 2
            delta = tf.expand_dims(X[:,dx[0]], 1) - tf.expand_dims(X2[:,dx[0]], 0)
            #sign-flip if y derivative
            if x_order > 0:
                return 2.*delta
            elif y_order > 0:
                return -2.*delta

        elif diff_order == 2:
            #only equal-index dimension pairs are non-zero
            i, j = dx
            if i == j:
                k = One*(2./self.lengthscales[i]**2.)
                if x_order == y_order:
                    return -k
                else:
                    return k
            else:
                return Zero
        raise RuntimeError

    def ind2dx(self, index):
        '''Convert a linear index into a tuple of integers to differentiate with respect to.'''
        return () if index == 0 else (index-1,)

    def kernel_derivative(self, X, X2, diff_x=(), diff_y=()):
        '''Calculates derivative of stationary kernel by decomposing into
        derivatives of the kernel wrt the distance and derivatives of the distance
        wrt the inputs. Procedurally generates each term of the derivative by
        iterating over all possible products of mixed derivatives.

        Parameters:
        - `X` and `X2`: Numpy/TF array. Inputs over which the kernel is evaluated.
        - `diff_x` and `diff_y`: Tuples of

        Return:
        - `result` -
        '''
        dist_d = partial(self.square_dist_d, X=X, X2=X2)
        dx = [(x,'x') for x in diff_x] + [(y,'y') for y in diff_y]
        x_order = len(diff_x)
        y_order = len(diff_y)
        diff_order = len(dx)
        if diff_order == 0:
            return self.derivative_base[0](self.square_dist(X, X2))
        dx_combinations = find_partitions(dx)
        order_factor = [self.derivative_base[order](self.square_dist(X, X2))
                        for order in range(diff_order+1)]
        terms = []
        for partition in dx_combinations:
            order = len(partition)
            derivatives = [dist_d(diff_x=tuple(i for i, key in elem if key is 'x'),
                                  diff_y=tuple(j for j, key in elem if key is 'y'))
                           for elem in partition]
            terms.append(order_factor[order]*tf.foldl(tf.multiply, derivatives))
        result = tf.foldl(tf.add, terms)
        return result

    def _kernel_factory(self, i, j):
        '''Return a function that calculates proper sub-kernel'''
        diff_x = self.ind2dx(i)
        diff_y = self.ind2dx(j)
        def f(X, X2):
            return self.kernel_derivative(X, X2, diff_x=diff_x, diff_y=diff_y)
        return f

class StationaryHessianKernel(StationaryGradKernel):
    '''Generalizes StationaryGradKernel to second deriatives'''
    def __init__(self, input_dim, active_dims = None):
        ndim = input_dim-1
        groups = 1 + ndim + ((ndim)*(ndim+1))//2
        BlockKernel.__init__(self, input_dim, groups, active_dims)
        self.variance = GPflow.param.Param(0.1, transforms.positive)
        self.lengthscales = GPflow.param.Param(1.*np.ones(input_dim-1), transforms.positive)
        self.tril_indices = np.column_stack(np.tril_indices(ndim))
        self.derivative_base = [lambda tau: self.variance*tf.exp(-0.5*tau), lambda tau: -0.5*self.variance*tf.exp(-0.5*tau), lambda tau: 0.25*self.variance*tf.exp(-0.5*tau),
        lambda tau: -0.125*self.variance*tf.exp(-0.5*tau), lambda tau: 0.0625*self.variance*tf.exp(-0.5*tau)]
        #self.derivative_base = [lambda tau: self.variance*(-0.5)**n*tf.exp(-0.5*tau) for n in range(5)]
        self.kerns = [[self._kernel_factory(i,j) for j in range(groups)] for i in range(groups)]


    def ind2dx(self, i):
        if i == 0:
            return ()
        elif i<self.input_dim:
            return (i-1,)
        else:
            return tuple(self.tril_indices[i-self.input_dim,:])


d = 1 #dimension
groups = 3 #observation types
res = 100 #plot resolution

#number of function, derivative, and second derivative observations
Ns = [10,5,0]
N = np.sum(Ns)


#generate data and annotate with observation type
ind = np.vstack([n*np.ones((Ns[n],1)) for n in range(3)])
val = np.vstack([np.linspace(0,1,n).reshape((-1,1)) for n in Ns])
X = np.concatenate([ind, val],1)

#test function and derivatives
f = lambda x: np.exp(-x)
fp = lambda x: -np.exp(-x)
fpp = lambda x: np.exp(-x)
y = np.vstack([fun(val[ind == i]).reshape((-1,1)) for fun, i in zip([f,fp,fpp], range(groups))])

#define kernels and GPs
k = StationaryHessianKernel(d+1)
kg = StationaryGradKernel(d+1)
m = GPflow.gpr.GPR(X, y, k)
mg = GPflow.gpr.GPR(X, y, kg)
m.likelihood.variance = 0.1
mg.likelihood.variance = 0.1

#compute kernel matrices
K = k.compute_K(X, X)
Kg = kg.compute_K(X, X)


#predict function, derivative, and second derivative
x = np.column_stack([np.linspace(-1,1, res),np.zeros(res)])
predlevel = lambda i: m.predict_f(np.column_stack([i*np.ones(res),x]))
mu, var = predlevel(0)
mug, varg = predlevel(1)
mug2, varg = predlevel(2)

#plot
pf = plt.plot(x[:,0],mu,label='Function')
#plt.scatter(X[X[:,0]==0,1],y[X[:,0]==0],color=pf[0].get_color())
pfp = plt.plot(x[:,0],mug,label='Derivative')
#plt.scatter(X[X[:,0]==1,1],y[X[:,0]==1],color=pfp[0].get_color())
pfpp = plt.plot(x[:,0],mug2,label='Derivative2')
#plt.scatter(*X[X[:,0]==2,1].T,color=pfpp[0].get_color())

plt.hlines(0, -1, 1)
plt.legend()
back to top