# Copyright 2017 the GPflow authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import tensorflow as tf import numpy as np import pandas as pd import gpflow from gpflow import settings from gpflow.test_util import GPflowTestCase from numpy.testing import assert_allclose class TestDataholder(GPflowTestCase): def test_create_dataholder(self): with self.test_context(): shape = (10,) d = gpflow.DataHolder(np.ones(shape)) self.assertAllEqual(d.shape, shape) self.assertEqual(d.dtype, np.float64) self.assertFalse(d.fixed_shape) self.assertFalse(d.trainable) shape = (10,) d = gpflow.DataHolder(np.ones(shape), dtype=gpflow.settings.float_type) self.assertAllEqual(d.shape, shape) self.assertEqual(d.dtype, gpflow.settings.float_type) self.assertFalse(d.fixed_shape) self.assertFalse(d.trainable) d = gpflow.DataHolder(1) self.assertAllEqual(d.shape, ()) self.assertEqual(d.dtype, np.int32) self.assertFalse(d.fixed_shape) self.assertFalse(d.trainable) d = gpflow.DataHolder(1.0) self.assertAllEqual(d.shape, ()) self.assertEqual(d.dtype, np.float64) self.assertFalse(d.fixed_shape) self.assertFalse(d.trainable) size = 10 shape = (size,) d = gpflow.DataHolder([1.] * size) self.assertAllEqual(d.shape, shape) self.assertEqual(d.dtype, np.float64) self.assertFalse(d.fixed_shape) self.assertFalse(d.trainable) d = gpflow.DataHolder(1.0, fix_shape=True) self.assertAllEqual(d.shape, ()) self.assertEqual(d.dtype, np.float64) self.assertTrue(d.fixed_shape) self.assertFalse(d.trainable) var = tf.get_variable('dataholder', shape=(), trainable=False) d = gpflow.DataHolder(var) self.assertAllEqual(d.shape, ()) self.assertEqual(d.dtype, np.float32) self.assertTrue(d.fixed_shape) self.assertFalse(d.trainable) tensor = var + 1 d = gpflow.DataHolder(tensor) self.assertAllEqual(d.shape, ()) self.assertEqual(d.dtype, np.float32) self.assertTrue(d.fixed_shape) self.assertFalse(d.trainable) def test_is_built(self): with self.test_context(): d = gpflow.DataHolder(1.0) with self.assertRaises(ValueError): d.is_built(None) with self.assertRaises(gpflow.GPflowError): d.is_built_coherence(tf.Graph()) def test_failed_creation(self): with self.test_context(): tensor = tf.get_variable('dataholder', shape=(1,)), values = [ tensor, [1, [1, [1]]], None, "test", object(), ] for value in values: with self.assertRaises(ValueError, msg='Value {}'.format(value)): gpflow.DataHolder(tensor) def test_fixed_shape(self): with self.test_context(): p = gpflow.DataHolder(1.) assert_allclose(1., 1.) self.assertFalse(p.fixed_shape) self.assertAllEqual(p.shape, ()) value = [10., 10.] p.assign(value) assert_allclose(p.read_value(), value) self.assertFalse(p.fixed_shape) self.assertAllEqual(p.shape, (2,)) p.fix_shape() assert_allclose(p.read_value(), value) self.assertTrue(p.fixed_shape) self.assertAllEqual(p.shape, (2,)) p.assign(np.zeros(p.shape)) value = np.zeros(p.shape) with self.assertRaises(ValueError): p.assign([1.], force=True) assert_allclose(p.read_value(), value) with self.assertRaises(ValueError): p.assign(1., force=True) assert_allclose(p.read_value(), value) with self.assertRaises(ValueError): p.assign(np.zeros((3, 3)), force=True) assert_allclose(p.read_value(), value) class TestMinibatch(GPflowTestCase): def test_create(self): with self.test_context(): values = [tf.get_variable('test', shape=()), "test", None] for v in values: with self.assertRaises(ValueError): gpflow.Minibatch(v) def test_clear(self): with self.test_context() as session: length = 10 seed = 10 arr = np.random.randn(length, 2) m = gpflow.Minibatch(arr, shuffle=False) self.assertEqual(m.is_built_coherence(), gpflow.Build.YES) self.assertEqual(m.seed, None) with self.assertRaises(gpflow.GPflowError): m.seed = seed self.assertEqual(m.seed, None) for i in range(length): assert_allclose(m.read_value(session=session), [arr[i]]) m.clear() self.assertEqual(m.seed, None) m.seed = seed self.assertEqual(m.seed, seed) self.assertEqual(m.is_built_coherence(), gpflow.Build.NO) self.assertEqual(m.parameter_tensor, None) def test_seed(self): with self.test_context() as session: length = 10 arr = np.random.randn(length, 2) batch_size = 2 m1 = gpflow.Minibatch(arr, seed=1, batch_size=batch_size) m2 = gpflow.Minibatch(arr, seed=1, batch_size=batch_size) self.assertEqual(m1.is_built_coherence(), gpflow.Build.YES) self.assertEqual(m1.seed, 1) with self.assertRaises(gpflow.GPflowError): m1.seed = 10 self.assertEqual(m2.is_built_coherence(), gpflow.Build.YES) self.assertEqual(m2.seed, 1) with self.assertRaises(gpflow.GPflowError): m2.seed = 10 self.assertEqual(m1.seed, 1) self.assertEqual(m2.seed, 1) for i in range(length): m1_value = m1.read_value(session=session) m2_value = m2.read_value(session=session) self.assertEqual(m1_value.shape[0], batch_size, msg='Index range "{}"'.format(i)) self.assertEqual(m2_value.shape[0], batch_size, msg='Index range "{}"'.format(i)) assert_allclose(m1_value, m2_value) def test_change_variable_size(self): with self.test_context() as session: m = gpflow.Parameterized() length = 10 arr = np.random.randn(length, 2) m.X = gpflow.Minibatch(arr, shuffle=False) for i in range(length): assert_allclose(m.X.read_value(session=session), [arr[i]]) length = 20 arr = np.random.randn(length, 2) m.X = arr for i in range(length): assert_allclose(m.X.read_value(session=session), [arr[i]]) def test_change_batch_size(self): with self.test_context() as session: length = 10 arr = np.random.randn(length, 2) m = gpflow.Minibatch(arr, shuffle=False) for i in range(length): assert_allclose(m.read_value(session=session), [arr[i]]) def check_batch_size(m, length, batch_size): self.assertEqual(m.batch_size, batch_size) for i in range(length//batch_size): value = m.read_value(session=session) self.assertEqual(value.shape[0], batch_size, msg='Index range "{}"'.format(i)) batch_size = 2 m.set_batch_size(batch_size) check_batch_size(m, length, batch_size) batch_size = 5 m.batch_size = batch_size check_batch_size(m, length, batch_size) batch_size = 10 m.set_batch_size(batch_size) check_batch_size(m, length, batch_size)