https://github.com/Microsoft/CNTK
Raw File
Tip revision: 9d6345b2fa794c38b4f02f71631ea71401b97362 authored by Frank Seide on 24 August 2016, 15:05:13 UTC
Merge branch 'master' of https://github.com/Microsoft/CNTK into fseide/kdd
Tip revision: 9d6345b
MinibatchSource.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

#include "stdafx.h"
#include "CNTKLibrary.h"
#include "Utils.h"
#include "Config.h"
#include "MinibatchSource.h"
#include "HeapMemoryProvider.h"
#include "ReaderShim.h"
#include "Function.h"
#include <tuple>
#include "ComputationNetworkBuilder.h"

using namespace Microsoft::MSR::CNTK;

namespace CNTK
{
    MinibatchSourcePtr CreateCompositeMinibatchSource(const Dictionary& configuration)
    {
        return MinibatchSourcePtr(new CompositeMinibatchSource(configuration));
    }

    CompositeMinibatchSource::CompositeMinibatchSource(const Dictionary& configuration)
        : m_epochEndReached(false), m_prevMinibatchSize(0), m_epochSize(SIZE_MAX)
    {
        ConfigParameters config;
        std::wstringstream s;
        for (const auto& keyValuePair : *(configuration.m_dictionaryData))
            AddConfigString(s, keyValuePair.first, keyValuePair.second, 0);

        config.Parse(msra::strfun::utf8(s.str()));

        const wchar_t* epochSizeConfigurationKey = L"epochSize";
        if (configuration.Contains(epochSizeConfigurationKey))
            m_epochSize = configuration[epochSizeConfigurationKey].GetValue<size_t>();

        if (m_epochSize == 0)
            m_epochSize = Microsoft::MSR::CNTK::requestDataSize;

        typedef Reader*(*CreateCompositeDataReaderProc)(const ConfigParameters* parameters);
        CreateCompositeDataReaderProc createReaderProc = (CreateCompositeDataReaderProc)Plugin().Load(L"CompositeDataReader", "CreateCompositeDataReader");
        m_compositeDataReader.reset(createReaderProc(&config));

        auto compositeDataReaderStreamDescs = m_compositeDataReader->GetStreamDescriptions();
        for (auto streamDesc : compositeDataReaderStreamDescs)
            m_streamInfos.insert({ streamDesc->m_name, streamDesc->m_id, AsStorageFormat(streamDesc->m_storageType), AsDataType(streamDesc->m_elementType), AsNDShape(*(streamDesc->m_sampleLayout)) });
    }

    /*virtual*/ const std::unordered_map<StreamInfo, MinibatchData>&
    CompositeMinibatchSource::GetNextMinibatch(const std::unordered_map<StreamInfo, std::pair<size_t, size_t>>& perStreamMBSizeLimits,
                                               const DeviceDescriptor& device /*= DeviceDescriptor::DefaultDevice()*/) /*override*/
    {
        m_minibatchData.clear();

        if (!m_epochEndReached)
        {
            // TODO: Support different minibatch sizes for different streams
            size_t requestedMinibatchSizeInSamples = 0;
            for (const auto& val : perStreamMBSizeLimits)
            {
                size_t maxNumSequencesRequested = val.second.first;
                size_t maxNumSamplesRequested = val.second.second;

                // TODO: Specifying minibatch size in #sequences is currently unsupported
                if (maxNumSequencesRequested != 0)
                    LogicError("Specifying minibatch size in #sequences is currently unsupported");

                if (requestedMinibatchSizeInSamples == 0)
                    requestedMinibatchSizeInSamples = maxNumSamplesRequested;
                else
                {
                    if (requestedMinibatchSizeInSamples != maxNumSamplesRequested)
                        LogicError("Different minibatch sizes across different input streams is currently unsupported!");
                }
            }

            if (requestedMinibatchSizeInSamples == 0)
                InvalidArgument("GetNextMinibatch: Requested minibatch sizes must be > 0");

            if (m_prevMinibatchSize == 0)
            {
                // TODO: Add support for distributed reading
                EpochConfiguration epochConfig = { 1, 0, requestedMinibatchSizeInSamples, m_epochSize, 0, 0 };
                m_compositeDataReader->StartEpoch(epochConfig);
                m_prevMinibatchSize = requestedMinibatchSizeInSamples;
            }

            if (requestedMinibatchSizeInSamples != m_prevMinibatchSize)
                LogicError("GetNextMinibatch: Changing minibatch sizes across calls is currently unsupported");

            auto compositeReaderMinibatchData = m_compositeDataReader->ReadMinibatch();
            m_epochEndReached = compositeReaderMinibatchData.m_endOfEpoch;

            auto compositeDataReaderStreamDescs = m_compositeDataReader->GetStreamDescriptions();
            size_t numStreams = compositeDataReaderStreamDescs.size();
            for (size_t i = 0; i < numStreams; ++i)
            {
                auto currentStreamDesc = compositeDataReaderStreamDescs[i];
                auto iter = std::find_if(perStreamMBSizeLimits.begin(), perStreamMBSizeLimits.end(), [currentStreamDesc](const std::pair<StreamInfo, std::pair<size_t, size_t>>& entry) {
                    return entry.first.m_id == currentStreamDesc->m_id;
                });

                if (iter == perStreamMBSizeLimits.end())
                    continue;

                auto& currentStreamInfo = iter->first;
                auto sampleShape = AsNDShape(*(currentStreamDesc->m_sampleLayout));

                ValuePtr minibatchValuePtr;
                if (compositeReaderMinibatchData.m_data.empty())
                {
                    minibatchValuePtr = MakeSharedObject<Value>(MakeSharedObject<NDArrayView>(currentStreamInfo.m_elementType, sampleShape.AppendShape({ 0, 0 }), DeviceDescriptor::CPUDevice()));
                    continue;
                }

                auto currentStreamMinibatchData = compositeReaderMinibatchData.m_data[i];
                if (currentStreamDesc->m_elementType == ElementType::tfloat)
                {
                    auto CNTKMatrixType = (currentStreamDesc->m_storageType == StorageType::dense) ? DENSE : SPARSE;
                    auto CNTKMatrixFormat = (currentStreamDesc->m_storageType == StorageType::dense) ? matrixFormatDense : matrixFormatSparseCSC;
                    auto dataMatrix = std::make_shared<Matrix<float>>(0, 0, CPUDEVICE, CNTKMatrixType, CNTKMatrixFormat);
                    size_t sampleSize = currentStreamDesc->m_sampleLayout->GetNumElements();

                    // TODO: Eliminate the unnecessary CPU to CPU copy
                    ReaderShim<float>::FillMatrixFromStream(currentStreamDesc->m_storageType, dataMatrix.get(), sampleSize, currentStreamMinibatchData);
                    minibatchValuePtr = CompositeFunction::GetValueObjectFromCNTKImplMatrixAndMBLayout<float>(sampleShape, *dataMatrix, currentStreamMinibatchData->m_layout, false);

                    size_t numSamples = currentStreamMinibatchData->m_layout->GetActualNumSamples();
                    size_t numSequences = currentStreamMinibatchData->m_layout->GetNumSequences();

                    m_minibatchData[currentStreamInfo] = { numSequences, numSamples, minibatchValuePtr };
                }
                else
                    LogicError("Input data of type other than DataType::Float is currently unsupported by the CNTK built-in composite MinibatchSource!");
            }
        }

        return m_minibatchData;
    }

    void ComputeInputPerDimMeansAndInvStdDevs(const MinibatchSourcePtr& minibatchSource,
                                              std::unordered_map<StreamInfo, std::pair<NDArrayViewPtr, NDArrayViewPtr>>& computedMeanAndInvStdDevs,
                                              const DeviceDescriptor& device /*= DeviceDescriptor::CPUDevice()*/)
    {
        typedef std::shared_ptr<ComputationNode<float>> ComputationNodePtr;
        const auto& minibatchSourceStreams = minibatchSource->StreamInfos();

        auto computationNetwork = std::make_shared<ComputationNetwork>(AsCNTKImplDeviceId(device));
        ComputationNetworkBuilder<float> builder(*computationNetwork);

        std::vector<ComputationNodeBasePtr> allInputNodes;
        std::unordered_map<StreamInfo, ComputationNodeBasePtr> streamToInputNodeMap;
        std::unordered_map<StreamInfo, Variable> streamToDummyInputVariableMap;
        std::unordered_map<StreamInfo, ComputationNodeBasePtr> streamToMeanNodeMap;
        std::unordered_map<StreamInfo, ComputationNodeBasePtr> streamToInvStdDevNodeMap;

        size_t totalSizePerSample = 0;
        for (auto& currentStreamKV : computedMeanAndInvStdDevs)
        {
            auto currentStreamInfo = currentStreamKV.first;
            if (minibatchSourceStreams.find(currentStreamInfo) == minibatchSourceStreams.end())
                InvalidArgument("ComputeMeanAndVariance: Stream for which mean and variance is to be computed is not supported by the specified minibatchSource");

            if (currentStreamInfo.m_elementType != DataType::Float)
                LogicError("Input data of type other than DataType::Float is currently unsupported by the CNTK built-in composite MinibatchSource!");

            auto inputVariableShape = currentStreamInfo.m_sampleLayout;
            auto inputTensorShape = AsTensorShape(inputVariableShape);
            totalSizePerSample += (inputVariableShape.TotalSize() * sizeof(float));

            ComputationNodePtr inputNode;
            Variable inputVariable;
            if (currentStreamInfo.m_storageFormat != StorageFormat::Dense)
            {
                inputNode = builder.CreateSparseInputNode(currentStreamInfo.m_name, inputTensorShape);
                inputVariable = Variable(inputVariableShape, true, DataType::Float, currentStreamInfo.m_name);
            }
            else
            {
                inputNode = builder.CreateInputNode(currentStreamInfo.m_name, inputTensorShape);
                inputVariable = Variable(inputVariableShape, DataType::Float, currentStreamInfo.m_name);
            }

            allInputNodes.push_back(inputNode);
            streamToInputNodeMap[currentStreamInfo] = inputNode;
            streamToDummyInputVariableMap[currentStreamInfo] = inputVariable;
            streamToMeanNodeMap[currentStreamInfo] = builder.Mean(inputNode);
            streamToInvStdDevNodeMap[currentStreamInfo] = builder.InvStdDev(inputNode);
        }

        computationNetwork->CompileNetwork();
        computationNetwork->AllocateAllMatrices(computationNetwork->RootNodes(), {}, nullptr);

        ScopedNetworkOperationMode modeGuard(computationNetwork, NetworkOperationMode::preComputing);

        // initialize
        auto preComputeNodes = computationNetwork->GetNodesRequiringPreComputation();
        for (auto & preComputeNode : preComputeNodes)
            dynamic_pointer_cast<IPreComputeNode>(preComputeNode)->MarkComputed(false /*begin accumulating*/);

        const size_t maxMinibatchDataSize = (1 << 27); // 128 MB
        const size_t minibatchSize = maxMinibatchDataSize / totalSizePerSample;
        std::unordered_map<StreamInfo, std::pair<size_t, size_t>> minibatchSizeLimits;
        for (auto& currentStreamKV : computedMeanAndInvStdDevs)
            minibatchSizeLimits.insert(std::make_pair(currentStreamKV.first, std::make_pair((size_t)0, minibatchSize)));

        for (;;)
        {
            auto minibatchData = minibatchSource->GetNextMinibatch(minibatchSizeLimits, device);
            if (minibatchData.empty())
                break;

            for (auto& currentStreamKV : computedMeanAndInvStdDevs)
                CompositeFunction::PopulateComputationNodeValue<float>({ streamToDummyInputVariableMap[currentStreamKV.first], minibatchData[currentStreamKV.first].m_data }, streamToInputNodeMap[currentStreamKV.first]);

            ComputationNetwork::BumpEvalTimeStamp(allInputNodes);

            computationNetwork->ForwardProp(preComputeNodes);
        }

        // finalize
        for (auto & preComputeNode : preComputeNodes)
            dynamic_pointer_cast<IPreComputeNode>(preComputeNode)->MarkComputed(true /*done accumulating*/);

        // Copy out the results
        for (auto& currentStreamKV : computedMeanAndInvStdDevs)
        {
            ValuePtr mean, invStdDev;
            if (computedMeanAndInvStdDevs[currentStreamKV.first].first != nullptr)
                mean = MakeSharedObject<Value>(computedMeanAndInvStdDevs[currentStreamKV.first].first);

            if (computedMeanAndInvStdDevs[currentStreamKV.first].second != nullptr)
                invStdDev = MakeSharedObject<Value>(computedMeanAndInvStdDevs[currentStreamKV.first].second);

            CompositeFunction::GetNodeOutputOrGradient(streamToDummyInputVariableMap[currentStreamKV.first], mean, streamToMeanNodeMap[currentStreamKV.first], false /*getGradient*/);
            CompositeFunction::GetNodeOutputOrGradient(streamToDummyInputVariableMap[currentStreamKV.first], invStdDev, streamToInvStdDevNodeMap[currentStreamKV.first], false /*getGradient*/);

            if (computedMeanAndInvStdDevs[currentStreamKV.first].first == nullptr)
                computedMeanAndInvStdDevs[currentStreamKV.first].first = mean->Data();

            if (computedMeanAndInvStdDevs[currentStreamKV.first].second == nullptr)
                computedMeanAndInvStdDevs[currentStreamKV.first].second = invStdDev->Data();

        }
    }
}

back to top