Raw File
onnx_format_test.py
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root
# for full license information.
# ==============================================================================

import os
import numpy as np
import cntk as C
from cntk.ops.tests.ops_test_utils import cntk_device
import pytest

def test_load_save_constant(tmpdir):
    c = C.constant(value=[1,3])
    root_node = c * 5

    result = root_node.eval()
    expected = [[[[5,15]]]]
    assert np.allclose(result, expected)

    filename = os.path.join(str(tmpdir), R'constant.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    loaded_result = loaded_node.eval()
    assert np.allclose(loaded_result, expected)

def test_dense_layer(tmpdir):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    img_shape = (1, 5, 5)
    img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=np.float32)

    x = C.input_variable(img.shape)
    root_node = C.layers.Dense(5, activation=C.softmax)(x)
    
    filename = os.path.join(str(tmpdir), R'dense_layer.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:img}), root_node.eval({x:img}))

CONVOLUTION_TEST_DATA = [
    # auto_padding: Value for the auto_adding parameter to convolution. 
    ([False, True, True]   # Equivalent to "SAME_UPPER".
     ),
    ([False, False, False] # Equivalent to "VALID" padding.     
     ),
    ([False, False, True]  # Equivalent to "VALID" padding.     
     ),
    ([False, True, False]  # Equivalent to "VALID" padding.     
     )
]
# This is a roundtrip test. It saves a CNTK convolution node in ONNX format (with different padding options), 
# and loads it back to check that the same results are produced.
@pytest.mark.parametrize("auto_padding", CONVOLUTION_TEST_DATA)
def test_convolution(tmpdir, auto_padding):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    img_shape = (1, 5, 5)
    img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=np.float32)

    x = C.input_variable(img.shape)
    filter = np.reshape(np.array([2, -1, -1, 2], dtype = np.float32), (1, 1, 2, 2))
    kernel = C.constant(value = filter)
    root_node = C.convolution(kernel, x, auto_padding=auto_padding)

    filename = os.path.join(str(tmpdir), R'conv.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:[img]}), root_node.eval({x:[img]}))

DType_Config = (np.float32, np.float16)
@pytest.mark.parametrize("dtype", DType_Config)
def test_convolution_transpose(tmpdir, dtype, device_id):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    if device_id == -1 and dtype == np.float16:
        pytest.skip('Test only runs on GPU')
    device = cntk_device(device_id)
    with C.default_options(dtype=dtype):
        img_shape = (1, 3, 3)
        img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=dtype)

        x = C.input_variable(img.shape)
        filter = np.reshape(np.array([2, -1, -1, 2], dtype=dtype), (1, 2, 2))
        kernel = C.constant(value = filter)
        root_node = C.convolution_transpose(kernel, x, auto_padding=[False], output_shape=(1, 4, 4))

        filename = os.path.join(str(tmpdir), R'conv_transpose.onnx')
        root_node.save(filename, format=C.ModelFormat.ONNX)

        loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
        assert root_node.shape == loaded_node.shape

        x_ = loaded_node.arguments[0]
        assert np.allclose(loaded_node.eval({x_:[img]}, device=device), root_node.eval({x:[img]}, device=device))

POOLING_TEST_DATA = [
    # auto_padding: Value for the auto_adding parameter to pooling. 
    ([True, True],   # Equivalent to "SAME_UPPER".
    True                    # pooling_type: True := MaxPooling, False := AveragePooling
     ),
    ([False, False], # Equivalent to "VALID" padding.
    True
     ),
    ([False, True],  # Equivalent to "VALID" padding.
    True     
     ),
    ([True, True],   # Equivalent to "SAME_UPPER".
    False
     ),
    ([False, False], # Equivalent to "VALID" padding.
    False
     ),
    # Enable this test case when we fix ONNX AveragePool to match even for edge pixels in {False, True] case.}
    # ([False, True],  # Equivalent to "VALID" padding.
    # False     
    #  )
]
# This is a roundtrip test. It saves a CNTK pooling node in ONNX format (with different padding options), 
# and loads it back to check that the same results are produced.
@pytest.mark.parametrize("auto_padding, pooling_type", POOLING_TEST_DATA)
@pytest.mark.parametrize("dtype", DType_Config)
def test_pooling(tmpdir, auto_padding, pooling_type, dtype, device_id):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    if device_id == -1 and dtype == np.float16:
        pytest.skip('Test only runs on GPU')
    device = cntk_device(device_id)
    with C.default_options(dtype=dtype):
        img_shape = (1, 5, 5)
        img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=dtype)

        x = C.input_variable(img.shape)    
        pool_type = C.MAX_POOLING if pooling_type else C.AVG_POOLING
        root_node = C.pooling(x, pool_type, (2, 2), auto_padding=auto_padding)

        filename = os.path.join(str(tmpdir), R'conv.onnx')
        root_node.save(filename, format=C.ModelFormat.ONNX)

        loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
        assert root_node.shape == loaded_node.shape

        x_ = loaded_node.arguments[0]
        assert np.allclose(loaded_node.eval({x_:[img]}, device=device), root_node.eval({x:[img]}, device=device))

def test_conv_model(tmpdir):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    def create_model(input):
        with C.layers.default_options(init=C.glorot_uniform(), activation=C.relu):
            model = C.layers.Sequential([
                C.layers.For(range(3), lambda i: [
                    C.layers.Convolution((5,5), [32,32,64][i], pad=True),
                    C.layers.MaxPooling((3,3), strides=(2,2))
                    ]),
                C.layers.Dense(64),
                C.layers.Dense(10, activation=None)
            ])

        return model(input)

    img_shape = (3, 32, 32)
    img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=np.float32)

    x = C.input_variable(img.shape)
    root_node = create_model(x)

    filename = os.path.join(str(tmpdir), R'conv_model.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:img}), root_node.eval({x:img}))

def test_batch_norm_model(tmpdir):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    image_height = 32
    image_width  = 32
    num_channels = 3
    num_classes  = 10

    input_var = C.input_variable((num_channels, image_height, image_width))
    label_var = C.input_variable((num_classes))
    def create_basic_model_with_batch_normalization(input, out_dims):
        with C.layers.default_options(activation=C.relu, init=C.glorot_uniform()):
            model = C.layers.Sequential([
                C.layers.For(range(3), lambda i: [
                    C.layers.Convolution((5,5), [image_width,image_height,64][i], pad=True),
                    C.layers.BatchNormalization(map_rank=1),
                    C.layers.MaxPooling((3,3), strides=(2,2))
                ]),
                C.layers.Dense(64),
                C.layers.BatchNormalization(map_rank=1),
                C.layers.Dense(out_dims, activation=None)
            ])

        return model(input)

    feature_scale = 1.0 / 256.0

    #TODO: ONNX only do right hand-side broadcast. This test fails 
    # if input_var, feature_scale is swapped. 
    input_var_norm = C.element_times(input_var, feature_scale)
    
    # apply model to input
    z = create_basic_model_with_batch_normalization(input_var_norm, out_dims=10)

    filename = os.path.join(str(tmpdir), R'bn_model.onnx')
    z.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert z.shape == loaded_node.shape

    img_shape = (num_channels, image_width, image_height)
    img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=np.float32)

    x = z.arguments[0];
    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:img}), z.eval({x:img}))

def test_vgg9_model(tmpdir):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    def create_model(input):
        with C.layers.default_options(activation=C.relu, init=C.glorot_uniform()):
            model = C.layers.Sequential([
                C.layers.For(range(3), lambda i: [
                    C.layers.Convolution((3,3), [64,96,128][i], pad=True),
                    C.layers.Convolution((3,3), [64,96,128][i], pad=True),
                    C.layers.MaxPooling((3,3), strides=(2,2))
                ]),
                C.layers.For(range(2), lambda : [
                    C.layers.Dense(1024)
                ]),
                C.layers.Dense(10, activation=None)
            ])
        
        return model(input)

    img_shape = (3, 32, 32)
    img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=np.float32)

    x = C.input_variable(img.shape)
    root_node = create_model(x)

    filename = os.path.join(str(tmpdir), R'vgg9_model.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:img}), root_node.eval({x:img}))

    # Additional test to ensure that loaded_node can be saved as both ONNX and CNTKv2 again.
    filename2 = os.path.join(str(tmpdir), R'vgg9_model2.onnx')
    loaded_node.save(filename2, format=C.ModelFormat.ONNX)

    filename3 = os.path.join(str(tmpdir), R'vgg9_model2.cntkmodel')
    loaded_node.save(filename3, format=C.ModelFormat.CNTKv2)

def test_conv3d_model(tmpdir):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    def create_model(input):
        with C.default_options (activation=C.relu):
            model = C.layers.Sequential([
                    C.layers.Convolution3D((3,3,3), 64, pad=True),
                    C.layers.MaxPooling((1,2,2), (1,2,2)),
                    C.layers.For(range(3), lambda i: [
                        C.layers.Convolution3D((3,3,3), [96, 128, 128][i], pad=True),
                        C.layers.Convolution3D((3,3,3), [96, 128, 128][i], pad=True),
                        C.layers.MaxPooling((2,2,2), (2,2,2))
                    ]),
                    C.layers.For(range(2), lambda : [
                        C.layers.Dense(1024), 
                        C.layers.Dropout(0.5)
                    ]),
                C.layers.Dense(100, activation=None)
            ])

        return model(input)

    video_shape = (3, 20, 32, 32)
    video = np.asarray(np.random.uniform(-1, 1, video_shape), dtype=np.float32)

    x = C.input_variable(video.shape)
    root_node = create_model(x)

    filename = os.path.join(str(tmpdir), R'conv3d_model.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:video}), root_node.eval({x:video}))

    # Additional test to ensure that loaded_node can be saved as both ONNX and CNTKv2 again.
    filename2 = os.path.join(str(tmpdir), R'conv3d_model2.onnx')
    loaded_node.save(filename2, format=C.ModelFormat.ONNX)

    filename3 = os.path.join(str(tmpdir), R'conv3d_model2.cntkmodel')
    loaded_node.save(filename3, format=C.ModelFormat.CNTKv2)

def test_resnet_model(tmpdir):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    def convolution_bn(input, filter_size, num_filters, strides=(1,1), init=C.normal(0.01), activation=C.relu):
        r = C.layers.Convolution(filter_size, 
                                 num_filters, 
                                 strides=strides, 
                                 init=init, 
                                 activation=None, 
                                 pad=True, bias=False)(input)
        r = C.layers.BatchNormalization(map_rank=1)(r)
        r = r if activation is None else activation(r)    
        return r

    def resnet_basic(input, num_filters):
        c1 = convolution_bn(input, (3,3), num_filters)
        c2 = convolution_bn(c1, (3,3), num_filters, activation=None)
        p  = c2 + input
        return C.relu(p)

    def resnet_basic_inc(input, num_filters):
        c1 = convolution_bn(input, (3,3), num_filters, strides=(2,2))
        c2 = convolution_bn(c1, (3,3), num_filters, activation=None)

        s = convolution_bn(input, (1,1), num_filters, strides=(2,2), activation=None)
    
        p = c2 + s
        return C.relu(p)

    def resnet_basic_stack(input, num_filters, num_stack):
        assert (num_stack > 0)
    
        r = input
        for _ in range(num_stack):
            r = resnet_basic(r, num_filters)
        return r

    def create_model(input):
        conv = convolution_bn(input, (3,3), 16)
        r1_1 = resnet_basic_stack(conv, 16, 3)

        r2_1 = resnet_basic_inc(r1_1, 32)
        r2_2 = resnet_basic_stack(r2_1, 32, 2)

        r3_1 = resnet_basic_inc(r2_2, 64)
        r3_2 = resnet_basic_stack(r3_1, 64, 2)

        # Global average pooling
        pool = C.layers.AveragePooling(filter_shape=(8,8), strides=(1,1))(r3_2)    
        return C.layers.Dense(10, init=C.normal(0.01), activation=None)(pool)

    img_shape = (3, 32, 32)
    img = np.asarray(np.random.uniform(0, 1, img_shape), dtype=np.float32)

    x = C.input_variable(img.shape)
    root_node = create_model(x)

    filename = os.path.join(str(tmpdir), R'resnet_model.onnx')
    root_node.save(filename, format=C.ModelFormat.ONNX)

    loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
    assert root_node.shape == loaded_node.shape

    x_ = loaded_node.arguments[0]
    assert np.allclose(loaded_node.eval({x_:img}), root_node.eval({x:img}))

    # Additional test to ensure that loaded_node can be saved as both ONNX and CNTKv2 again.
    filename2 = os.path.join(str(tmpdir), R'resnet_model2.onnx')
    loaded_node.save(filename2, format=C.ModelFormat.ONNX)

    filename3 = os.path.join(str(tmpdir), R'resnet_model2.cntkmodel')
    loaded_node.save(filename3, format=C.ModelFormat.CNTKv2)


@pytest.mark.parametrize("dtype", DType_Config)
def test_conv_with_freedim_model(tmpdir, dtype, device_id):
    pytest.skip('Needs to be fixed after removal of batch axis change.')
    if device_id == -1 and dtype == np.float16:
        pytest.skip('Test only runs on GPU')
    device = cntk_device(device_id)
    with C.default_options(dtype=dtype):
        img_shape = (3, 32, 32)
        img = np.asarray(np.random.uniform(-1, 1, img_shape), dtype=dtype)

        x = C.input_variable((3, C.FreeDimension, C.FreeDimension))

        conv_size1 = (32, 3, 5, 5)
        conv_map1 = C.constant(value=np.arange(np.prod(conv_size1), dtype=dtype).reshape(conv_size1))
        conv_op1 = C.convolution(conv_map1, x, auto_padding=(False, True, True))
        relu_op1 = C.relu(conv_op1)
        maxpool_op1 = C.pooling(relu_op1, C.MAX_POOLING, (2, 2), (2, 2))

        conv_size2 = (64, 32, 3, 3)
        conv_map2 = C.constant(value=np.arange(np.prod(conv_size2), dtype=dtype).reshape(conv_size2))
        conv_op2 = C.convolution(conv_map2, maxpool_op1, auto_padding=(False, True, True))
        relu_op2 = C.relu(conv_op2)
        root_node = C.pooling(relu_op2, C.MAX_POOLING, (2, 2), (2, 2))

        filename = os.path.join(str(tmpdir), R'conv_with_freedim.onnx')
        root_node.save(filename, format=C.ModelFormat.ONNX)

        loaded_node = C.Function.load(filename, format=C.ModelFormat.ONNX)
        assert root_node.shape == loaded_node.shape

        x_ = loaded_node.arguments[0]
        assert np.allclose(loaded_node.eval({x_:img}, device=device), root_node.eval({x:img}, device=device))

        # Additional test to ensure that loaded_node can be saved as both ONNX and CNTKv2 again.
        filename2 = os.path.join(str(tmpdir), R'conv_with_freedim2.onnx')
        loaded_node.save(filename2, format=C.ModelFormat.ONNX)

        filename3 = os.path.join(str(tmpdir), R'conv_with_freedim2.cntkmodel')
        loaded_node.save(filename3, format=C.ModelFormat.CNTKv2)

def test_save_no_junk(tmpdir):
    """ Test for an issue with save() not having O_TRUNC mode
    resulting in possible junk at the end of file if it already exists
    """
    try:
        import onnx
    except ImportError:
        pytest.skip('ONNX is not installed.')

    filename = os.path.join(str(tmpdir), R'no_junk.onnx')
    filename_new = os.path.join(str(tmpdir), R'no_junk_new.onnx')

    # Save a large model.
    g = C.ceil([1, 2, 3, 4, 5, 6, 7, 8] * 100)
    g.save(filename, format=C.ModelFormat.ONNX)

    # Save a smaller model using the same file name and a new file name.
    g = C.ceil([1, 2, 3, 4, 5, 6, 7] * 100)
    g.save(filename, format=C.ModelFormat.ONNX)
    g.save(filename_new, format=C.ModelFormat.ONNX)

    # Check the files can be loaded as standard ONNX files,
    # and both result in the same model.
    assert onnx.load(filename) == onnx.load(filename_new)
back to top