https://github.com/GPflow/GPflow
Tip revision: 024d381db3be2215320aca9a0ba6b1425a0a7f79 authored by Khurram Ghani on 03 May 2023, 22:29:57 UTC
Merge pull request #2068 from GPflow/develop
Merge pull request #2068 from GPflow/develop
Tip revision: 024d381
test_kernels.py
# Copyright 2018 the GPflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Optional, Sequence, Tuple, Type, Union, cast
import numpy as np
import pytest
import tensorflow as tf
from check_shapes import check_shapes
from numpy.testing import assert_allclose
import gpflow.ci_utils
from gpflow.base import AnyNDArray, TensorType
from gpflow.config import default_float
from gpflow.kernels import (
RBF,
AnisotropicStationary,
ArcCosine,
ChangePoints,
Constant,
Convolutional,
Coregion,
Cosine,
IsotropicStationary,
Kernel,
Linear,
LinearCoregionalization,
Matern12,
Matern32,
Matern52,
MultioutputKernel,
Periodic,
Polynomial,
RationalQuadratic,
SeparateIndependent,
SharedIndependent,
SquaredExponential,
Stationary,
White,
)
from gpflow.kernels.categorical import _concat_inputs_with_latents
from tests.gpflow.kernels.reference import ref_arccosine_kernel, ref_periodic_kernel, ref_rbf_kernel
rng = np.random.RandomState(1)
@check_shapes(
"X: [N, D]",
"locations: [L]",
"steepness: [broadcast L]",
)
def _ref_changepoints(
X: AnyNDArray,
kernels: Sequence[Kernel],
locations: Sequence[float],
steepness: Union[float, Sequence[float]],
) -> AnyNDArray:
"""
Calculates K(X) for each kernel in `kernels`, then multiply by sigmoid functions
in order to smoothly transition betwen them. The sigmoid transitions are defined
by a location and a steepness parameter.
"""
locations = sorted(locations)
steepness = steepness if isinstance(steepness, Sequence) else [steepness] * len(locations)
np_locations: AnyNDArray = np.array(locations).reshape((1, 1, -1))
np_steepness: AnyNDArray = np.array(steepness).reshape((1, 1, -1))
sig_X = 1.0 / (1.0 + np.exp(-np_steepness * (X[:, :, None] - np_locations)))
starters = sig_X * np.transpose(sig_X, axes=(1, 0, 2))
stoppers = (1 - sig_X) * np.transpose((1 - sig_X), axes=(1, 0, 2))
ones = np.ones((X.shape[0], X.shape[0], 1))
starters = np.concatenate([ones, starters], axis=2)
stoppers = np.concatenate([stoppers, ones], axis=2)
kernel_stack: AnyNDArray = np.stack([k(X) for k in kernels], axis=2)
return cast(AnyNDArray, (kernel_stack * starters * stoppers).sum(axis=2))
@pytest.mark.parametrize("variance, lengthscales", [[2.3, 1.4]])
def test_rbf_1d(variance: TensorType, lengthscales: TensorType) -> None:
X = rng.randn(3, 1)
kernel = SquaredExponential(lengthscales=lengthscales, variance=variance)
gram_matrix = kernel(X)
reference_gram_matrix = ref_rbf_kernel(X, lengthscales, variance)
assert_allclose(gram_matrix, reference_gram_matrix)
@pytest.mark.parametrize("variance, lengthscales", [[2.3, 1.4]])
def test_rq_1d(variance: TensorType, lengthscales: TensorType) -> None:
kSE = SquaredExponential(lengthscales=lengthscales, variance=variance)
kRQ = RationalQuadratic(lengthscales=lengthscales, variance=variance, alpha=1e8)
rng = np.random.RandomState(1)
X: AnyNDArray = rng.randn(6, 1).astype(default_float())
gram_matrix_SE = kSE(X)
gram_matrix_RQ = kRQ(X)
assert_allclose(gram_matrix_SE, gram_matrix_RQ)
@check_shapes(
"variance: []",
"weight_variances: [broadcast n_active_dims]",
"bias_variance: []",
"X: [N, D]",
)
def _assert_arccosine_kern_err(
variance: TensorType,
weight_variances: TensorType,
bias_variance: TensorType,
order: int,
X: TensorType,
) -> None:
kernel = ArcCosine(
order=order,
variance=variance,
weight_variances=weight_variances,
bias_variance=bias_variance,
)
gram_matrix = kernel(X)
reference_gram_matrix = ref_arccosine_kernel(
X, order, weight_variances, bias_variance, variance
)
assert_allclose(gram_matrix, reference_gram_matrix)
@pytest.mark.parametrize("order", ArcCosine.implemented_orders)
@pytest.mark.parametrize("D, weight_variances", [[1, 1.7], [3, 1.7], [3, (1.1, 1.7, 1.9)]])
@pytest.mark.parametrize("N, bias_variance, variance", [[3, 0.6, 2.3]])
def test_arccosine_1d_and_3d(
order: int,
D: int,
N: int,
weight_variances: Union[float, Sequence[float]],
bias_variance: float,
variance: float,
) -> None:
X_data = rng.randn(N, D)
_assert_arccosine_kern_err(variance, weight_variances, bias_variance, order, X_data)
@pytest.mark.parametrize("order", [42])
def test_arccosine_non_implemented_order(order: int) -> None:
with pytest.raises(ValueError):
ArcCosine(order=order)
@pytest.mark.parametrize("D, N", [[1, 4]])
def test_arccosine_nan_gradient(D: int, N: int) -> None:
X = rng.rand(N, D)
kernel = ArcCosine()
with tf.GradientTape() as tape:
Kff = kernel(X)
grads = tape.gradient(Kff, kernel.trainable_variables)
assert not np.any(np.isnan(grads))
@pytest.mark.parametrize(
"base_class",
[
SquaredExponential,
Matern12,
Matern32,
Matern52,
],
)
@pytest.mark.parametrize(
"D, lengthscales, period",
[
[1, 2.0, 3.0], # 1d, single lengthscale, single period
[2, 11.5, 3.0], # 2d, single lengthscale, single period
[2, 11.5, (3.0, 6.0)], # 2d, single lengthscale, ard period
[2, (11.5, 12.5), 3.0], # 2d, ard lengthscales, single period
[2, (11.5, 12.5), (3.0, 6.0)], # 2d, ard lengthscales, ard period
],
)
@pytest.mark.parametrize(
"N, variance",
[
[3, 2.3],
[5, 1.3],
],
)
def test_periodic(
base_class: Type[Stationary],
D: int,
N: int,
lengthscales: TensorType,
variance: TensorType,
period: TensorType,
) -> None:
X = rng.randn(N, D) if D == 1 else rng.multivariate_normal(np.zeros(D), np.eye(D), N)
base_kernel = base_class(lengthscales=lengthscales, variance=variance)
kernel = Periodic(base_kernel, period=period)
gram_matrix = kernel(X)
reference_gram_matrix = ref_periodic_kernel(
X, base_class.__name__, lengthscales, variance, period
)
assert_allclose(gram_matrix, reference_gram_matrix)
@pytest.mark.parametrize(
"base_class",
[
SquaredExponential,
Matern12,
],
)
def test_periodic_diag(base_class: Type[Stationary]) -> None:
N, D = 5, 3
X = rng.multivariate_normal(np.zeros(D), np.eye(D), N)
base_kernel = base_class(lengthscales=2.0, variance=1.0)
kernel = Periodic(base_kernel, period=6.0)
assert_allclose(base_kernel(X, full_cov=False), kernel(X, full_cov=False))
def test_periodic_non_stationary_base_kernel() -> None:
error_msg = r"Periodic requires an IsotropicStationary kernel as the `base_kernel`"
with pytest.raises(TypeError, match=error_msg):
Periodic(Linear())
def test_periodic_bad_ard_period() -> None:
error_msg = r"Size of `active_dims` \[1 2\] does not match size of ard parameter \(3\)"
base_kernel = RBF(active_dims=[1, 2])
with pytest.raises(ValueError, match=error_msg):
Periodic(base_kernel, period=[1.0, 1.0, 1.0])
kernel_setups: Tuple[Kernel, ...] = tuple(
kernel()
for kernel in gpflow.ci_utils.subclasses(Stationary)
if kernel not in (IsotropicStationary, AnisotropicStationary)
) + (
Constant(),
Linear(),
Polynomial(),
ArcCosine(),
)
@pytest.mark.parametrize("D", [1, 5])
@pytest.mark.parametrize("kernel", kernel_setups)
@pytest.mark.parametrize("N", [10])
def test_kernel_symmetry_1d_and_5d(D: int, kernel: Kernel, N: int) -> None:
X = rng.randn(N, D)
errors = kernel(X) - kernel(X, X)
assert np.allclose(errors, 0)
@pytest.mark.parametrize("N, N2, input_dim, output_dim, rank", [[10, 12, 1, 3, 2]])
def test_coregion_shape(N: int, N2: int, input_dim: int, output_dim: int, rank: int) -> None:
X = np.random.randint(0, output_dim, (N, input_dim))
X2 = np.random.randint(0, output_dim, (N2, input_dim))
kernel = Coregion(output_dim=output_dim, rank=rank)
kernel.W.assign(rng.randn(output_dim, rank))
kernel.kappa.assign(np.exp(rng.randn(output_dim, 1).reshape(-1)))
Kff2 = kernel(X, X2)
assert Kff2.shape == (10, 12)
Kff = kernel(X)
assert Kff.shape == (10, 10)
@pytest.mark.parametrize("N, input_dim, output_dim, rank", [[10, 1, 3, 2]])
def test_coregion_diag(N: int, input_dim: int, output_dim: int, rank: int) -> None:
X = np.random.randint(0, output_dim, (N, input_dim))
kernel = Coregion(output_dim=output_dim, rank=rank)
kernel.W.assign(rng.randn(output_dim, rank))
kernel.kappa.assign(np.exp(rng.randn(output_dim, 1).reshape(-1)))
K = kernel(X)
Kdiag = kernel.K_diag(X)
assert np.allclose(np.diag(K), Kdiag)
@pytest.mark.parametrize("N, input_dim, output_dim, rank", [[10, 1, 3, 2]])
def test_coregion_slice(N: int, input_dim: int, output_dim: int, rank: int) -> None:
X = np.random.randint(0, output_dim, (N, input_dim))
X = np.hstack((X, rng.randn(10, 1)))
kernel1 = Coregion(output_dim=output_dim, rank=rank, active_dims=[0])
# compute another kernel with additinoal inputs,
# make sure out kernel is still okay.
kernel2 = SquaredExponential(active_dims=[1])
kernel_prod = kernel1 * kernel2
K1 = kernel_prod(X)
K2 = kernel1(X) * kernel2(X) # slicing happens inside kernel
assert np.allclose(K1, K2)
_dim = 3
kernel_setups_extended: Tuple[Kernel, ...] = (
kernel_setups
+ (
SquaredExponential() + Linear(),
SquaredExponential() * Linear(),
SquaredExponential() + Linear(variance=rng.rand(_dim)),
)
+ tuple(ArcCosine(order=order) for order in ArcCosine.implemented_orders)
)
@pytest.mark.parametrize("kernel", kernel_setups_extended)
@pytest.mark.parametrize("N, dim", [[30, _dim]])
def test_diags(kernel: Kernel, N: int, dim: int) -> None:
X = np.random.randn(N, dim)
kernel1 = tf.linalg.diag_part(kernel(X, full_cov=True))
kernel2 = kernel(X, full_cov=False)
assert np.allclose(kernel1, kernel2)
def test_conv_diag() -> None:
kernel = Convolutional(SquaredExponential(), [3, 3], [2, 2])
X = np.random.randn(3, 9)
kernel_full = np.diagonal(kernel(X, full_cov=True))
kernel_diag = kernel(X, full_cov=False)
assert np.allclose(kernel_full, kernel_diag)
assert 4 == kernel.patch_len
assert 4 == kernel.num_patches
# Add a rbf and linear kernel, make sure the result is the same as adding the result of
# the kernels separately.
_kernel_setups_add: Tuple[Kernel, ...] = (
SquaredExponential(),
Linear(),
SquaredExponential() + Linear(),
)
@pytest.mark.parametrize("N, D", [[10, 1]])
def test_add_symmetric(N: int, D: int) -> None:
X = rng.randn(N, D)
Kffs = [kernel(X) for kernel in _kernel_setups_add]
assert np.allclose(Kffs[0] + Kffs[1], Kffs[2])
@pytest.mark.parametrize("N, M, D", [[10, 12, 1]])
def test_add_asymmetric(N: int, M: int, D: int) -> None:
X, Z = rng.randn(N, D), rng.randn(M, D)
Kfus = [kernel(X, Z) for kernel in _kernel_setups_add]
assert np.allclose(Kfus[0] + Kfus[1], Kfus[2])
@pytest.mark.parametrize("N, D", [[10, 1]])
def test_white(N: int, D: int) -> None:
"""
The white kernel should not give the same result when called with k(X) and
k(X, X)
"""
X = rng.randn(N, D)
kernel = White()
Kff_sym = kernel(X)
Kff_asym = kernel(X, X)
assert not np.allclose(Kff_sym, Kff_asym)
_kernel_classes_slice = [
kernel
for kernel in gpflow.ci_utils.subclasses(Stationary)
if kernel not in (IsotropicStationary, AnisotropicStationary)
] + [
Constant,
Linear,
Polynomial,
]
_kernel_triples_slice = [
(k1(active_dims=[0]), k2(active_dims=[1]), k3(active_dims=slice(0, 1)))
for k1, k2, k3 in zip(_kernel_classes_slice, _kernel_classes_slice, _kernel_classes_slice)
]
@pytest.mark.parametrize("kernel_triple", _kernel_triples_slice)
@pytest.mark.parametrize("N, D", [[20, 2]])
def test_slice_symmetric(kernel_triple: Tuple[Kernel, Kernel, Kernel], N: int, D: int) -> None:
X = rng.randn(N, D)
K1, K3 = kernel_triple[0](X), kernel_triple[2](X[:, :1])
assert np.allclose(K1, K3)
K2, K4 = kernel_triple[1](X), kernel_triple[2](X[:, 1:])
assert np.allclose(K2, K4)
@pytest.mark.parametrize("kernel_triple", _kernel_triples_slice)
@pytest.mark.parametrize("N, M, D", [[10, 12, 2]])
def test_slice_asymmetric(
kernel_triple: Tuple[Kernel, Kernel, Kernel], N: int, M: int, D: int
) -> None:
X = rng.randn(N, D)
Z = rng.randn(M, D)
K1, K3 = kernel_triple[0](X, Z), kernel_triple[2](X[:, :1], Z[:, :1])
assert np.allclose(K1, K3)
K2, K4 = kernel_triple[1](X, Z), kernel_triple[2](X[:, 1:], Z[:, 1:])
assert np.allclose(K2, K4)
_kernel_setups_prod: Tuple[Kernel, ...] = (
Matern32(),
Matern52(lengthscales=0.3),
Matern32() * Matern52(lengthscales=0.3),
)
@pytest.mark.parametrize("N, D", [[30, 2]])
def test_product(N: int, D: int) -> None:
X = rng.randn(N, D)
Kffs = [kernel(X) for kernel in _kernel_setups_prod]
assert np.allclose(Kffs[0] * Kffs[1], Kffs[2])
@pytest.mark.parametrize("N, D", [[30, 4], [10, 7]])
def test_active_product(N: int, D: int) -> None:
X = rng.randn(N, D)
dims, rand_idx, ls = (
list(range(D)),
int(rng.randint(0, D)),
rng.uniform(1.0, 7.0, D),
)
active_dims_list = [dims[:rand_idx] + dims[rand_idx + 1 :], [rand_idx], dims]
lengthscales_list = [
np.hstack([ls[:rand_idx], ls[rand_idx + 1 :]]),
ls[rand_idx],
ls,
]
kernels = [
SquaredExponential(lengthscales=lengthscales, active_dims=dims)
for dims, lengthscales in zip(active_dims_list, lengthscales_list)
]
kernel_prod = kernels[0] * kernels[1]
Kff = kernels[2](X)
Kff_prod = kernel_prod(X)
assert np.allclose(Kff, Kff_prod)
@pytest.mark.parametrize("D", [4, 7])
def test_ard_init_scalar(D: int) -> None:
"""
For ard kernels, make sure that kernels can be instantiated with a single
scalar lengthscale or a suitable array of lengthscales
"""
kernel_1 = SquaredExponential(lengthscales=2.3)
kernel_2 = SquaredExponential(lengthscales=np.ones(D) * 2.3)
lengthscales_1 = kernel_1.lengthscales.numpy()
lengthscales_2 = kernel_2.lengthscales.numpy()
assert np.allclose(lengthscales_1, lengthscales_2, atol=1e-10)
def test_ard_invalid_active_dims() -> None:
msg = r"Size of `active_dims` \[1\] does not match size of ard parameter \(2\)"
with pytest.raises(ValueError, match=msg):
SquaredExponential(lengthscales=np.ones(2), active_dims=[1])
@pytest.mark.parametrize(
"kernel_class, param_name",
[
[SquaredExponential, "lengthscales"],
[Linear, "variance"],
[ArcCosine, "weight_variances"],
[Cosine, "lengthscales"],
],
)
@pytest.mark.parametrize(
"param_value, ard",
[
[1.0, False],
[[1.0], True],
[[1.0, 1.0], True],
],
)
def test_ard_property(
kernel_class: Type[Kernel], param_name: str, param_value: Any, ard: bool
) -> None:
kernel = kernel_class(**{param_name: param_value})
assert kernel.ard is ard
@pytest.mark.parametrize(
"locations, steepness, error_msg",
[
# 1. Kernels locations dimension mismatch
[
[1.0],
1.0,
r"Number of kernels \(3\) must be one more than the number of changepoint locations \(1\)",
],
# 2. Locations steepness dimension mismatch
[
[1.0, 2.0],
[1.0],
r"Dimension of steepness \(1\) does not match number of changepoint locations \(2\)",
],
],
)
def test_changepoints_init_fail(
locations: Sequence[float], steepness: Union[float, Sequence[float]], error_msg: str
) -> None:
kernels: Sequence[Kernel] = [
Matern12(),
Linear(),
Matern32(),
]
with pytest.raises(ValueError, match=error_msg):
ChangePoints(kernels, locations, steepness)
@check_shapes(
"X: [N, D]",
"locations: [L]",
"steepness: [broadcast L]",
)
def _assert_changepoints_kern_err(
X: TensorType,
kernels: Sequence[Kernel],
locations: Sequence[float],
steepness: Union[float, Sequence[float]],
) -> None:
kernel = ChangePoints(kernels, locations, steepness=steepness)
reference_gram_matrix = _ref_changepoints(X, kernels, locations, steepness)
assert_allclose(kernel(X), reference_gram_matrix)
assert_allclose(kernel.K_diag(X), np.diag(reference_gram_matrix))
@pytest.mark.parametrize("N", [2, 10])
@pytest.mark.parametrize(
"kernels, locations, steepness",
[
# 1. Single changepoint
[[Constant(), Constant()], [2.0], 5.0],
# 2. Two changepoints
[
[
Constant(),
Constant(),
Constant(),
],
[1.0, 2.0],
5.0,
],
# 3. Multiple steepness
[
[
Constant(),
Constant(),
Constant(),
],
[1.0, 2.0],
[5.0, 10.0],
],
# 4. Variety of kernels
[
[
Matern12(),
Linear(),
SquaredExponential(),
Constant(),
],
[1.0, 2.0, 3.0],
5.0,
],
],
)
def test_changepoints(
N: int,
kernels: Sequence[Kernel],
locations: Sequence[float],
steepness: Union[float, Sequence[float]],
) -> None:
X_data = rng.randn(N, 1)
_assert_changepoints_kern_err(X_data, kernels, locations, steepness)
@pytest.mark.parametrize(
"active_dims_1, active_dims_2, is_separate",
[
[[1, 2, 3], None, False],
[None, [1, 2, 3], False],
[None, None, False],
[[1, 2, 3], [3, 4, 5], False],
[[1, 2, 3], [4, 5, 6], True],
],
)
def test_on_separate_dims(
active_dims_1: Optional[Sequence[int]],
active_dims_2: Optional[Sequence[int]],
is_separate: bool,
) -> None:
kernel_1 = Linear(active_dims=active_dims_1)
kernel_2 = SquaredExponential(active_dims=active_dims_2)
assert kernel_1.on_separate_dims(kernel_2) == is_separate
assert kernel_2.on_separate_dims(kernel_1) == is_separate
assert kernel_1.on_separate_dims(kernel_1) is False
assert kernel_2.on_separate_dims(kernel_2) is False
@pytest.mark.parametrize("kernel", kernel_setups_extended)
def test_kernel_call_diag_and_X2_errors(kernel: Kernel) -> None:
X = rng.randn(4, 1)
X2 = rng.randn(5, 1)
with pytest.raises(ValueError):
kernel(X, X2, full_cov=False)
def test_periodic_active_dims_matches() -> None:
active_dims = [1]
base_kernel = SquaredExponential(active_dims=active_dims)
kernel = Periodic(base_kernel=base_kernel)
assert kernel.active_dims == base_kernel.active_dims
# type-ignores below, is because mypy doesn't understand that the setter and the getter for
# `active_dims` have different types.
kernel.active_dims = [2] # type: ignore[assignment]
assert kernel.active_dims == base_kernel.active_dims
base_kernel.active_dims = [3] # type: ignore[assignment]
assert kernel.active_dims == base_kernel.active_dims
def test_latent_kernels() -> None:
kernel_list: Tuple[Kernel, ...] = (SquaredExponential(), White(), White() + Linear())
multioutput_kernel_list: Tuple[MultioutputKernel, ...] = (
SharedIndependent(SquaredExponential(), 3),
SeparateIndependent(kernel_list),
LinearCoregionalization(kernel_list, np.random.random((5, 3))),
)
assert len(multioutput_kernel_list[0].latent_kernels) == 1
assert multioutput_kernel_list[1].latent_kernels == tuple(kernel_list)
assert multioutput_kernel_list[2].latent_kernels == tuple(kernel_list)
def test_combination_LMC_kernels() -> None:
N, D, P = 100, 3, 2
kernel_list1: Tuple[Kernel, ...] = (Linear(active_dims=[1]), SquaredExponential())
L1 = len(kernel_list1)
kernel_list2: Tuple[Kernel, ...] = (SquaredExponential(), Linear(), Linear())
L2 = len(kernel_list2)
k1 = LinearCoregionalization(kernel_list1, np.random.randn(P, L1))
k2 = LinearCoregionalization(kernel_list2, np.random.randn(P, L2))
kernel = k1 + k2
X = np.random.randn(N, D)
K1 = k1(X, full_cov=True)
K2 = k2(X, full_cov=True)
K = kernel(X, full_cov=True)
assert K.shape == [N, P, N, P]
np.testing.assert_allclose(K, K1 + K2)
def test_concat_inputs_with_latents() -> None:
X = tf.constant(
[
[0.11333948, 0.0],
[0.68914756, 0.0],
[0.49686943, 0.0],
[0.93093605, 0.0],
[0.49400229, 0.0],
[0.2563728, 0.0],
[0.60688866, 0.0],
[0.26627661, 0.0],
[0.17639742, 0.0],
[0.12579375, 0.0],
[0.94793767, 1.0],
[0.57164863, 1.0],
[0.05673398, 1.0],
[0.27555594, 1.0],
[0.89225699, 1.0],
[0.73987146, 1.0],
[0.06209851, 1.0],
[0.91578623, 1.0],
[0.38951871, 1.0],
[0.25707253, 1.0],
[0.92527423, 2.0],
[0.12519213, 2.0],
[0.27104576, 2.0],
[0.24123346, 2.0],
[0.77848324, 2.0],
[0.45188542, 2.0],
[0.12848926, 2.0],
[0.67861482, 2.0],
[0.43295772, 2.0],
[0.26668177, 2.0],
],
dtype=tf.float64,
)
Z = tf.constant([[0.0], [0.5], [1.0]], dtype=tf.float64)
result = tf.constant(
[
[0.11333948, 0.0],
[0.68914756, 0.0],
[0.49686943, 0.0],
[0.93093605, 0.0],
[0.49400229, 0.0],
[0.2563728, 0.0],
[0.60688866, 0.0],
[0.26627661, 0.0],
[0.17639742, 0.0],
[0.12579375, 0.0],
[0.94793767, 0.5],
[0.57164863, 0.5],
[0.05673398, 0.5],
[0.27555594, 0.5],
[0.89225699, 0.5],
[0.73987146, 0.5],
[0.06209851, 0.5],
[0.91578623, 0.5],
[0.38951871, 0.5],
[0.25707253, 0.5],
[0.92527423, 1.0],
[0.12519213, 1.0],
[0.27104576, 1.0],
[0.24123346, 1.0],
[0.77848324, 1.0],
[0.45188542, 1.0],
[0.12848926, 1.0],
[0.67861482, 1.0],
[0.43295772, 1.0],
[0.26668177, 1.0],
],
dtype=tf.float64,
)
assert np.all(_concat_inputs_with_latents(Z, X) == result)