# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE.md file in the project root # for full license information. # ============================================================================== from __future__ import print_function import numpy as np import sys import os import cntk as C # Paths relative to current python file. abs_path = os.path.dirname(os.path.abspath(__file__)) data_path = os.path.join(abs_path, "..", "DataSets", "MNIST") model_path = os.path.join(abs_path, "Output", "Models") # Define the reader for both training and evaluation action. def create_reader(path, is_training, input_dim, label_dim): return C.io.MinibatchSource(C.io.CTFDeserializer(path, C.io.StreamDefs( features = C.io.StreamDef(field='features', shape=input_dim), labels = C.io.StreamDef(field='labels', shape=label_dim) )), randomize=is_training, max_sweeps = C.io.INFINITELY_REPEAT if is_training else 1) # Trains and tests a simple auto encoder for MNIST images using deconvolution def deconv_mnist(max_epochs=3): image_height = 28 image_width = 28 num_channels = 1 input_dim = image_height * image_width * num_channels num_output_classes = 10 # Input variable and normalization input_var = C.ops.input_variable((num_channels, image_height, image_width), np.float32) scaled_input = C.ops.element_times(C.ops.constant(0.00390625), input_var, name="input_node") # Define the auto encoder model cMap = 1 conv1 = C.layers.Convolution2D ((5,5), cMap, pad=True, activation=C.ops.relu)(scaled_input) pool1 = C.layers.MaxPooling ((4,4), (4,4), name="pooling_node")(conv1) unpool1 = C.layers.MaxUnpooling ((4,4), (4,4))(pool1, conv1) z = C.layers.ConvolutionTranspose2D((5,5), num_channels, pad=True, bias=False, init=C.glorot_uniform(0.001), name="output_node")(unpool1) # define rmse loss function (should be 'err = C.ops.minus(deconv1, scaled_input)') f2 = C.ops.element_times(C.ops.constant(0.00390625), input_var) err = C.ops.reshape(C.ops.minus(z, f2), (784)) sq_err = C.ops.element_times(err, err) mse = C.ops.reduce_mean(sq_err) rmse_loss = C.ops.sqrt(mse) rmse_eval = C.ops.sqrt(mse) reader_train = create_reader(os.path.join(data_path, 'Train-28x28_cntk_text.txt'), True, input_dim, num_output_classes) # training config epoch_size = 60000 minibatch_size = 64 # Set learning parameters lr_schedule = C.learning_parameter_schedule_per_sample([0.00015], epoch_size=epoch_size) mm_schedule = C.learners.momentum_schedule_per_sample([0.9983347214509387], epoch_size=epoch_size) # Instantiate the trainer object to drive the model training learner = C.learners.momentum_sgd(z.parameters, lr_schedule, mm_schedule, unit_gain=True) progress_printer = C.logging.ProgressPrinter(tag='Training') trainer = C.Trainer(z, (rmse_loss, rmse_eval), learner, progress_printer) # define mapping from reader streams to network inputs input_map = { input_var : reader_train.streams.features } C.logging.log_number_of_parameters(z) ; print() # Get minibatches of images to train with and perform model training for epoch in range(max_epochs): # loop over epochs sample_count = 0 while sample_count < epoch_size: # loop over minibatches in the epoch data = reader_train.next_minibatch(min(minibatch_size, epoch_size - sample_count), input_map=input_map) # fetch minibatch. trainer.train_minibatch(data) # update model with it sample_count += data[input_var].num_samples # count samples processed so far trainer.summarize_training_progress() z.save(os.path.join(model_path, "07_Deconvolution_PY_{}.model".format(epoch))) # rename final model last_model_name = os.path.join(model_path, "07_Deconvolution_PY_{}.model".format(max_epochs - 1)) final_model_name = os.path.join(model_path, "07_Deconvolution_PY.model") try: os.remove(final_model_name) except OSError: pass os.rename(last_model_name, final_model_name) # Load test data reader_test = create_reader(os.path.join(data_path, 'Test-28x28_cntk_text.txt'), False, input_dim, num_output_classes) input_map = { input_var : reader_test.streams.features } # Test data for trained model epoch_size = 10000 minibatch_size = 1024 # process minibatches and evaluate the model metric_numer = 0 metric_denom = 0 sample_count = 0 minibatch_index = 0 while sample_count < epoch_size: current_minibatch = min(minibatch_size, epoch_size - sample_count) # Fetch next test min batch. data = reader_test.next_minibatch(current_minibatch, input_map=input_map) # minibatch data to be trained with metric_numer += trainer.test_minibatch(data) * current_minibatch metric_denom += current_minibatch # Keep track of the number of samples processed so far. sample_count += data[input_var].num_samples minibatch_index += 1 print("") print("Final Results: Minibatch[1-{}]: errs = {:0.2f}% * {}".format(minibatch_index+1, (metric_numer*100.0)/metric_denom, metric_denom)) print("") return metric_numer/metric_denom if __name__=='__main__': deconv_mnist()