https://github.com/Microsoft/CNTK
Raw File
Tip revision: 57191f0af17dbc7344dcce21a25f968d6e42991c authored by Thiago Crepaldi on 14 May 2018, 21:17:41 UTC
Merge branch 'thiagofc/test-auto-merge' of https://github.com/Microsoft/cntk into thiagofc/test-auto-merge
Tip revision: 57191f0
DataParallelDistributedLearner.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

#include "stdafx.h"
#include "DataParallelDistributedLearner.h"
#include "DistributedCommunicator.h"
#include "Learner.h"
#include "PerformanceProfiler.h"

#ifdef CNTK_PARALLEL_TRAINING_SUPPORT
#include "QuantizedDistributedCommunicator.h"
#include "QuantizedDataParallelDistributedLearner.h"
#include "BlockMomentumDistributedLearner.h"
#endif

namespace CNTK
{
#ifdef CNTK_PARALLEL_TRAINING_SUPPORT
    QuantizedDistributedCommunicatorPtr QuantizedMPICommunicator(bool zeroThresholdFor1Bit, bool useQuantizationForSelfStripe, size_t numQuantizationBits)
    {
        return MakeSharedObject<QuantizedMPICommunicatorImpl>(zeroThresholdFor1Bit, useQuantizationForSelfStripe, numQuantizationBits);
    }

    DistributedLearnerPtr CreateQuantizedDataParallelDistributedLearner(
        QuantizedDistributedCommunicatorPtr communicator,
        LearnerPtr learner,
        size_t distributeAfterSamples,
        bool useAsyncBufferedParameterUpdate)
    {
        return MakeSharedObject<QuantizedDataParallelDistributedLearner>(communicator, learner, distributeAfterSamples, useAsyncBufferedParameterUpdate);
    }

    DistributedLearnerPtr CreateBlockMomentumDistributedLearner(
        DistributedCommunicatorPtr communicator,
        LearnerPtr learner,
        size_t distributeAfterSamples,
        size_t blockSize,
        bool useNestrovMomentum,
        bool resetSGDMomentumAfterAggregation,
        double blockLearningRate)
    {
        return MakeSharedObject<BlockMomentumDistributedLearner>(
            communicator,
            learner,
            distributeAfterSamples,
            blockSize,
            useNestrovMomentum,
            resetSGDMomentumAfterAggregation,
            blockLearningRate);
    }

    DistributedLearnerPtr CreateBlockMomentumDistributedLearner(
        DistributedCommunicatorPtr communicator,
        LearnerPtr learner,
        size_t distributeAfterSamples,
        size_t blockSize,
        double blockMomentumAsTimeConstant,
        bool useNestrovMomentum,
        bool resetSGDMomentumAfterAggregation,
        double blockLearningRate)
    {
        return MakeSharedObject<BlockMomentumDistributedLearner>(
            communicator,
            learner,
            distributeAfterSamples,
            blockSize,
            useNestrovMomentum,
            resetSGDMomentumAfterAggregation,
            blockLearningRate,
            blockMomentumAsTimeConstant);
    }

#else
    QuantizedDistributedCommunicatorPtr QuantizedMPICommunicator(bool, bool, size_t)
    {
        LogicError("Quantized MPI Communicator is not supported for this build. The GPU build is needed, see CNTK wiki for details.");
    }

    DistributedLearnerPtr CreateQuantizedDataParallelDistributedLearner(QuantizedDistributedCommunicatorPtr, LearnerPtr, size_t, bool)
    {
        LogicError("Quantized Distributed Trainer is not supported for this build. The GPU build is needed, see CNTK wiki for details.");
    }

    DistributedLearnerPtr CreateBlockMomentumDistributedLearner(
        DistributedCommunicatorPtr /*communicator*/,
        LearnerPtr,
        size_t /*distributeAfterSamples*/,
        size_t /*blockSize*/,
        bool /*useNestrovMomentum*/,
        bool /*resetSGDMomentumAfterAggregation*/,
        double /*blockLearningRate*/)
    {
        LogicError("Block Momentum Distributed Trainer is not supported for this build. The GPU build is needed, see CNTK wiki for details.");
    }

    DistributedLearnerPtr CreateBlockMomentumDistributedLearner(
        DistributedCommunicatorPtr /*communicator*/,
        LearnerPtr,
        size_t /*distributeAfterSamples*/,
        size_t /*blockSize*/,
        double /*blockMomentumAsTimeConstant*/,
        bool /*useNestrovMomentum*/,
        bool /*resetSGDMomentumAfterAggregation*/,
        double /*blockLearningRate*/)
    {
        LogicError("Block Momentum Distributed Trainer is not supported for this build. The GPU build is needed, see CNTK wiki for details.");
    }
#endif

    DistributedLearnerPtr CreateDataParallelDistributedLearner(DistributedCommunicatorPtr communicator, LearnerPtr learner, size_t distributedAfterSamples, bool useAsyncBufferedParameterUpdate)
    {
        return MakeSharedObject<DataParallelDistributedLearner>(communicator, learner, distributedAfterSamples, useAsyncBufferedParameterUpdate);
    }

    DataParallelDistributedLearner::DataParallelDistributedLearner(DistributedCommunicatorPtr communicator, LearnerPtr learner, size_t distributedAfterSamples, bool useAsyncBufferedParameterUpdate)
        : DistributedLearnerBase(communicator, learner, distributedAfterSamples, !Internal::ShouldUseSparseGradientAggregationInDataParallelSGD())
    {
        if (useAsyncBufferedParameterUpdate)
            LogicError("Asynchronous parameter update is not yet supported for the DataParallelDistributedLearner.");
    }

    bool DataParallelDistributedLearner::Update(std::unordered_map<Parameter, NDArrayViewPtr>& gradientValues, MinibatchInfo& info)
    {
        // sparse gradient may be converted to dense for aggregation
        std::unordered_map<Parameter, NDArrayViewPtr> convertedGradientValues = gradientValues;

        if (m_sampleCount >= m_distributeAfterSamples && m_communicator->Workers().size() > 1)
        {
#ifndef  CNTK_UWP
            auto profGradientAgg = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainGradient);
#endif

            if (info.IsEmpty())
                PrepaireZeroGradients(gradientValues);

            // sorts gradient buffers according to parameter uid, and perform sparse to dense conversion
            // if !UseSparseGradientAggregationInDataParallelSGD()
            ConvertToOrdered(gradientValues, m_gradientBuffer, &convertedGradientValues);

            std::vector<NDArrayViewPtr> valuesToAggregate;
            std::vector<NDArrayViewPtr> sparseValuesToAggregate;
            for (const auto& i : m_gradientBuffer)
            {
                auto storageFormat = i.second->GetStorageFormat();
                if (storageFormat == StorageFormat::Dense)
                {
                    valuesToAggregate.push_back(i.second);
                }
                else
                {
                    if (storageFormat != StorageFormat::SparseBlockCol)
                        LogicError("Unsupported sparse gradient format");

                    // NOTE: CPU sparse block column stores block Ids in size_t and it's different from GPU SBC
                    // We should refactor the CPU SBC code to align with GPU in future
                    if (i.second->Device().Type() == DeviceKind::CPU)
                        LogicError("Unsupported CPU sparse block column aggregation");

                    sparseValuesToAggregate.push_back(i.second);
                }
            }
            valuesToAggregate.push_back(info.evalCriterionValue);
            valuesToAggregate.push_back(info.trainingLossValue);

            auto value = MakeSharedObject<NDArrayView>(static_cast<double>(info.numberOfSamples), NDShape{}, DeviceDescriptor::CPUDevice());
            valuesToAggregate.push_back(value);

            m_communicator->AggregateInPlace(valuesToAggregate, m_communicator->Workers());
            info.numberOfSamples = static_cast<size_t>(*valuesToAggregate.back()->WritableDataBuffer<double>());

            if (!sparseValuesToAggregate.empty())
            {
                m_communicator->AllReduceSparseBlockColumn(sparseValuesToAggregate);
            }
        }

#ifndef  CNTK_UWP
        auto profWeights = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainWeights);
#endif

        m_sampleCount += info.numberOfSamples;
        m_gradientBuffer.clear();

        if (info.IsEmpty())
            return false;

        return m_learner->Update(convertedGradientValues, info.numberOfSamples, info.atEndOfSweep);
    }
}
back to top