from pathlib import Path
from typing import Any, List, Optional, Sequence, Union
import numpy as np
import pytest
import tensorflow as tf
from _pytest.capture import CaptureFixture
from check_shapes import check_shape as cs
from check_shapes import check_shapes
from matplotlib.axes import Axes
from matplotlib.figure import Figure
import gpflow
from gpflow.base import AnyNDArray
from gpflow.models import GPR, GPModel
from gpflow.monitor import (
ExecuteCallback,
ImageToTensorBoard,
ModelToTensorBoard,
Monitor,
MonitorTask,
MonitorTaskGroup,
ScalarToTensorBoard,
)
class Data:
num_data = 20
num_steps = 2
class DummyTask(MonitorTask):
def run(self, **kwargs: Any) -> None:
pass
class DummyStepCallback:
current_step = 0
@check_shapes(
"variables[all]: [...]",
"values[all]: [...]",
)
def callback(
self, step: int, variables: Sequence[tf.Variable], values: Sequence[tf.Tensor]
) -> None:
self.current_step = step
@pytest.fixture
@check_shapes()
def model() -> GPModel:
data = (
cs(np.random.randn(Data.num_data, 2), "[N, 2]"),
cs(np.random.randn(Data.num_data, 2), "[N, 2]"),
)
kernel = gpflow.kernels.SquaredExponential(lengthscales=[1.0, 2.0])
return GPR(data, kernel, noise_variance=0.01)
@pytest.fixture
def monitor(model: GPModel, tmp_path: Path) -> Monitor:
tmp_path_str = str(tmp_path)
@check_shapes(
"return: []",
)
def lml_callback() -> tf.Tensor:
return model.log_marginal_likelihood()
def print_callback() -> None:
print("foo")
return Monitor(
MonitorTaskGroup(
[
ModelToTensorBoard(tmp_path_str, model),
ScalarToTensorBoard(tmp_path_str, lml_callback, "lml"),
],
period=2,
),
MonitorTaskGroup(ExecuteCallback(print_callback), period=1),
)
def _get_size_directory(d: Path) -> int:
"""Calculating the size of a directory (in Bytes)."""
return sum(f.stat().st_size for f in d.glob("**/*"))
# Smoke tests for the individual tasks
# #####################################
def test_ExecuteCallback() -> None:
def callback() -> None:
print("ExecuteCallback test")
task = ExecuteCallback(callback)
task(0)
compiled_task = tf.function(task)
compiled_task(0)
def test_ImageToTensorBoard(tmp_path: Path) -> None:
"""Smoke test `ImageToTensorBoard` in Eager and Compiled mode."""
tmp_path_str = str(tmp_path)
def plotting_cb(fig: Figure, axes: AnyNDArray) -> None:
axes[0, 0].plot(np.random.randn(2), np.random.randn(2))
axes[1, 0].plot(np.random.randn(2), np.random.randn(2))
axes[0, 1].plot(np.random.randn(2), np.random.randn(2))
axes[1, 1].plot(np.random.randn(2), np.random.randn(2))
fig_kwargs = dict(figsize=(10, 10))
subplots_kwargs = dict(sharex=True, nrows=2, ncols=2)
task = ImageToTensorBoard(
tmp_path_str, plotting_cb, "image", fig_kw=fig_kwargs, subplots_kw=subplots_kwargs
)
task(0)
compiled_task = tf.function(task)
compiled_task(0)
def test_ScalarToTensorBoard(tmp_path: Path) -> None:
"""Smoke test `ScalarToTensorBoard` in Eager and Compiled mode."""
tmp_path_str = str(tmp_path)
def scalar_cb() -> float:
return 0.0
task = ScalarToTensorBoard(tmp_path_str, scalar_cb, "scalar")
task(0)
compiled_task = tf.function(task)
compiled_task(0)
def test_ScalarToTensorBoard_with_argument(tmp_path: Path) -> None:
"""Smoke test `ScalarToTensorBoard` in Eager and Compiled mode."""
tmp_path_str = str(tmp_path)
def scalar_cb(x: Optional[float] = None) -> float:
assert x is not None
return 2 * x
task = ScalarToTensorBoard(tmp_path_str, scalar_cb, "scalar")
compiled_task = tf.function(task)
task(0, x=1.0)
compiled_task(0, x=1.0)
def test_ScalarToTensorBoard_with_wrong_keyword_argument(tmp_path: Path) -> None:
tmp_path_str = str(tmp_path)
def scalar_cb(x: Optional[float] = None) -> float:
assert x is not None
return 2 * x
task = ScalarToTensorBoard(tmp_path_str, scalar_cb, "scalar")
compiled_task = tf.function(task)
with pytest.raises(TypeError, match=r"got an unexpected keyword argument 'y'"):
task(0, y=1.0)
with pytest.raises(TypeError, match=r"got an unexpected keyword argument 'y'"):
compiled_task(0, y=1.0)
def test_ModelToTensorboard(model: GPModel, tmp_path: Path) -> None:
"""Smoke test `ModelToTensorBoard` in Eager and Compiled mode."""
tmp_path_str = str(tmp_path)
task = ModelToTensorBoard(tmp_path_str, model)
task(0)
compiled_task = tf.function(task)
compiled_task(0)
def test_ExecuteCallback_arguments(capsys: CaptureFixture[str]) -> None:
def cb1(x: Optional[int] = None, **_: Any) -> None:
assert x is not None
print(x)
def cb2(**_: Any) -> None:
print(2)
def cb3(y: Optional[int] = None, **_: Any) -> None:
assert y is not None
print(y)
group1 = MonitorTaskGroup([ExecuteCallback(cb1), ExecuteCallback(cb2)])
group2 = MonitorTaskGroup(ExecuteCallback(cb3))
monitor = Monitor(group1, group2)
monitor(0, x=1, y=3)
out, _ = capsys.readouterr()
assert out == "1\n2\n3\n"
# Smoke test Monitor and MonitorTaskGroup
# ########################################
def none() -> None:
return None
@pytest.mark.parametrize(
"task_or_tasks",
[
ExecuteCallback(none),
[ExecuteCallback(none)],
[ExecuteCallback(none), ExecuteCallback(none)],
],
)
def test_MonitorTaskGroup_and_Monitor(task_or_tasks: Union[MonitorTask, List[MonitorTask]]) -> None:
group = MonitorTaskGroup(task_or_tasks, period=2)
# check that the tasks is actually a list (custom setter)
isinstance(group.tasks, list)
# Smoke test the __call__
group(0)
compiled_group = tf.function(group)
compiled_group(0)
# Smoke test the Monitor wrapper
monitor = Monitor(group)
monitor(0)
compiled_monitor = tf.function(monitor)
compiled_monitor(0)
def test_Monitor(monitor: Monitor) -> None:
monitor(0)
compiled_monitor = tf.function(monitor)
compiled_monitor(0)
# Functionality tests
# ###################
def test_compiled_execute_callable(capsys: CaptureFixture[str]) -> None:
"""
Test that the `ExecuteCallback` when compiled behaves as expected.
We test that python prints are not executed anymore.
"""
string_to_print = "Eager mode"
def callback() -> None:
print(string_to_print)
task = ExecuteCallback(callback)
# Eager mode
for i in range(Data.num_steps):
task(i)
out, _ = capsys.readouterr()
# We expect a print for each step
assert out == (f"{string_to_print}\n" * Data.num_steps)
# Autograph mode
compiled_task = tf.function(task)
for i in tf.range(Data.num_steps):
compiled_task(i)
out, _ = capsys.readouterr()
assert out == f"{string_to_print}\n"
def test_periodicity_group(capsys: CaptureFixture[str]) -> None:
"""Test that groups are called at different periods."""
task_a = ExecuteCallback(lambda: print("a", end=" "))
task_b = ExecuteCallback(lambda: print("b", end=" "))
task_X = ExecuteCallback(lambda: print("X", end=" "))
group_often = MonitorTaskGroup([task_a, task_b], period=1)
group_seldom = MonitorTaskGroup([task_X], period=3)
monitor = Monitor(group_often, group_seldom)
for i in range(7):
monitor(i)
out, _ = capsys.readouterr()
expected = "a b X a b a b a b X a b a b a b X "
assert out == expected
# AutoGraph mode
compiled_monitor = tf.function(monitor)
for i in tf.range(7):
compiled_monitor(i)
# When using TF's range and compiling the monitoring we only expected the python prints once.
out, _ = capsys.readouterr()
assert "a b X"
def test_logdir_created(monitor: Monitor, model: GPModel, tmp_path: Path) -> None:
"""
Check that TensorFlow summaries are written.
"""
# check existence
assert tmp_path.is_dir()
size_before = _get_size_directory(tmp_path)
assert size_before > 0
opt = tf.optimizers.Adam()
for step in range(Data.num_steps):
opt.minimize(model.training_loss, model.trainable_variables)
monitor(step)
size_after = _get_size_directory(tmp_path)
assert size_after > size_before
def test_compile_monitor(monitor: Monitor, model: GPModel) -> None:
opt = tf.optimizers.Adam()
@tf.function
def tf_func(step: tf.Tensor) -> None:
opt.minimize(model.training_loss, model.trainable_variables)
monitor(step)
for step in tf.range(100):
tf_func(step)
def test_scipy_monitor(monitor: Monitor, model: GPModel) -> None:
opt = gpflow.optimizers.Scipy()
opt.minimize(model.training_loss, model.trainable_variables, step_callback=monitor)
def test_scipy_monitor_called(model: GPModel) -> None:
task = DummyTask()
monitor = Monitor(MonitorTaskGroup(task, period=1))
opt = gpflow.optimizers.Scipy()
opt.minimize(model.training_loss, model.trainable_variables, step_callback=monitor)
assert task.current_step > 1
def test_scipy_step_callback_called(model: GPModel) -> None:
dsc = DummyStepCallback()
opt = gpflow.optimizers.Scipy()
res = opt.minimize(model.training_loss, model.trainable_variables, step_callback=dsc.callback)
assert res.nit == dsc.current_step + 1 > 0
assert not hasattr(res, "loss_history")
def test_scipy_loss_history(model: GPModel) -> None:
opt = gpflow.optimizers.Scipy()
res = opt.minimize(model.training_loss, model.trainable_variables, track_loss_history=True)
assert res.nit == len(res.loss_history) > 1
assert res.loss_history[0] > res.loss_history[-1]
assert res.loss_history[-1] == res.fun == model.training_loss()
def test_scipy_step_callback_called_with_history(model: GPModel) -> None:
dsc = DummyStepCallback()
opt = gpflow.optimizers.Scipy()
res = opt.minimize(
model.training_loss,
model.trainable_variables,
step_callback=dsc.callback,
track_loss_history=True,
)
assert res.nit == len(res.loss_history) == dsc.current_step + 1 > 0