Raw File
persist_test.py
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root
# for full license information.
# ==============================================================================

import numpy as np

import cntk as C
from cntk.debugging import save_as_legacy_model

# TODO: a test for restore_model?

def test_load_save_constant(tmpdir):
    c = C.constant(value=[1,3])
    root_node = c * 5

    result = root_node.eval()
    expected = [[[[5,15]]]]
    assert np.allclose(result, expected)

    filename = str(tmpdir / 'c_plus_c.mod')
    root_node.save(filename)

    loaded_node = C.Function.load(filename)
    loaded_result = loaded_node.eval()
    assert np.allclose(loaded_result, expected)

    filename = filename + '.legacy'
    save_as_legacy_model(root_node, filename)
    loaded_node = C.Function.load(filename)
    loaded_result = loaded_node.eval()
    assert np.allclose(loaded_result, expected)

def test_load_save_input_legacy_names(tmpdir):
    i1 = C.input_variable((1,2), name='i1')
    root_node = abs(i1)
    input1 = [[[-1,2]]]

    result = root_node.eval({i1: input1})
    expected = [[[[1,2]]]]
    assert np.allclose(result, expected)

    filename = str(tmpdir / 'i_plus_c_0.mod')
    root_node.save(filename)

    loaded_node = C.Function.load(filename)

    # Test specifying the input node names by order
    loaded_result = loaded_node.eval([input1])
    assert np.allclose(loaded_result, expected)

    filename = filename + '.legacy'
    save_as_legacy_model(root_node, filename)
    loaded_node = C.Function.load(filename)
    loaded_result = loaded_node.eval([input1])
    assert np.allclose(loaded_result, expected)

def test_load_save_inputs(tmpdir):
    i1 = C.input_variable((1,2), name='i1')
    i2 = C.input_variable((2,1), name='i2')
    root_node = C.plus(i1, i2)
    input1 = [[[1,2]]]
    input2 = [[[[1],[2]]]]

    result = root_node.eval({i1: input1, i2: input2})
    expected = [[[[2,3],[3,4]]]]
    assert np.allclose(result, expected)

    filename = str(tmpdir / 'i_plus_i_0.mod')
    root_node.save(filename)

    loaded_node = C.Function.load(filename)

    # Test specifying the input nodes by name
    loaded_result = loaded_node.eval({'i1': input1, 'i2': input2})
    assert np.allclose(loaded_result, expected)

    filename = filename + '.legacy'
    save_as_legacy_model(root_node, filename)
    loaded_node = C.Function.load(filename)
    loaded_result = loaded_node.eval({'i1': input1, 'i2': input2})
    assert np.allclose(loaded_result, expected)

def test_load_save_unique_input(tmpdir):
    i1 = C.input_variable((1,2), name='i1')
    root_node = C.softmax(i1)

    input1 = [[[1,2]]]
    result = root_node.eval(input1)
    expected = [[[[ 0.268941,  0.731059]]]]
    assert np.allclose(result, expected)

    filename = str(tmpdir / 'i_plus_0.mod')
    root_node.save(filename)

    loaded_node = C.Function.load(filename)

    # Test specifying the only value for a unique input
    loaded_result = loaded_node.eval(input1)
    assert np.allclose(loaded_result, expected)

    filename = filename + '.legacy'
    save_as_legacy_model(root_node, filename)
    loaded_node = C.Function.load(filename)
    loaded_result = loaded_node.eval(input1)
    assert np.allclose(loaded_result, expected)


def test_large_model_serialization_float(tmpdir):
    import os; 
    from cntk.layers import Recurrence, LSTM, Dense

    type_size = np.dtype(np.float32).itemsize
    two_gb = 2**31
    size = (2097152 + 4, 256, 512, 4096)
    assert size[0] * size[1] * type_size > two_gb

    device = C.device.cpu()
    i = C.sequence.input(size[0])
    w = C.Parameter((size[0], size[1]), init=C.uniform(3.0, seed=12345),
        device=device)
    e = C.times(i, w)
                                    
    h_fwd = Recurrence(LSTM(size[2]))(e)
    h_bwd = Recurrence(LSTM(size[2]), go_backwards=True)(e)
    h_last_fwd = C.sequence.last(h_fwd)
    h_first_bwd = C.sequence.first(h_bwd)
    t = C.splice(h_last_fwd, h_first_bwd)

    z1 = Dense(size[2], activation=C.relu)(t)     
    z = Dense(2, activation=None)(z1)  

    filename = str(tmpdir / 'test_large_model_serialization_float.out')
    z.save(filename)

    assert os.path.getsize(filename) > two_gb

    y = C.Function.load(filename, device=device)

    assert (len(z.parameters) == len(y.parameters))

    for param_pair in zip(z.parameters, y.parameters):
        assert param_pair[0].shape == param_pair[1].shape
        assert np.allclose(param_pair[0].value, param_pair[1].value)


# TODO: once layers lib understands dtype parameter, 
# this should be merge with the test above:
# for dtype in [np.float32, np.float64]:
#    ....
def test_large_model_serialization_double(tmpdir):
    import os; 

    two_gb = 2**31
    type_size = np.dtype(np.float64).itemsize
    size = two_gb /  type_size + 10

    assert size * type_size > two_gb

    device = C.device.cpu()
    i = C.sequence.input(size, dtype=np.float64)
    w = C.Parameter((size,), dtype=np.float64, 
        init=C.uniform(3.0, seed=12345), device=device)
    z = C.times(i, w)

    filename = str(tmpdir / 'test_large_model_serialization_double.out')
    z.save(filename)

    assert os.path.getsize(filename) > two_gb

    y = C.Function.load(filename, device=device)

    assert (len(z.parameters) == len(y.parameters))

    for param_pair in zip(z.parameters, y.parameters):
        assert param_pair[0].shape == param_pair[1].shape
        assert np.allclose(param_pair[0].value, param_pair[1].value)


def test_restore_constants(tmpdir):
    C.device.try_set_default_device(C.device.cpu())
    def _setvalue(x, v):
        x.value = 0 * x.value + v if len(x.shape)> 0 else np.array(v, dtype=np.float32)

    def _setall(f, v):
        for x in f.constants + f.parameters:
            _setvalue(x, v)

    def _checkall(f, v):
        for x in f.constants + f.parameters:
            assert (x.value == v).all()

    x = C.input_variable(10)
    f = C.layers.BatchNormalization()(x)
    trainer = C.Trainer(f, C.reduce_sum(f), C.sgd(f.parameters, C.learning_parameter_schedule_per_sample(0.1)))

    model_filename = str(tmpdir / 'function.out')
    checkpoint_filename = str(tmpdir / 'checkpoint.out')
    _setall(f, 1)
    f.save(model_filename)
    _checkall(f, 1)

    _setall(f, 2)
    trainer.save_checkpoint(checkpoint_filename)
    _checkall(f, 2)

    _setall(f, 3)
    _checkall(f, 3)
    trainer.restore_from_checkpoint(checkpoint_filename)
    _checkall(f, 2)

    f2 = C.Function.load(model_filename)
    _checkall(f2, 1)

    _setall(f, 4)
    _checkall(f, 4)
    f.restore(model_filename)
    _checkall(f, 1)

    _setall(f2, 5)
    _checkall(f2, 5)


def test_replace_save_restoreinplace_constant(tmpdir):
    from cntk import placeholder

    c1 = C.constant(value=0)
    c2 = C.constant(value=0)
    c3 = C.constant(value=0)
    p1 = placeholder(name="placeholder1")
    p2 = placeholder(name="placeholder2")
    result = (c1 * p1) * c2 + c3 + p2

    p3 = placeholder(name="placeholder3")
    p4 = placeholder(name="placeholder4")
    block = C.ops.as_block(result, [(p2, p4), (p1, p3)], "test_block")

    arg_map = { p3: C.constant(value=0) }
    block.replace_placeholders(arg_map)

    model_filename = str(tmpdir / 'simple_block.mod')
    block.save(model_filename)
    block.restore(model_filename)

    assert len(block.placeholders) == 1
back to top