https://github.com/Microsoft/CNTK
Raw File
Tip revision: 0d4095941752c17231f1a1f049763426de0b401f authored by Binbin Zhang on 25 July 2017, 02:25:44 UTC
add backstitch
Tip revision: 0d40959
Evaluator.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

#include "stdafx.h"
#include "CNTKLibrary.h"
#include "Utils.h"
#include "PerformanceProfiler.h"

namespace CNTK
{
    EvaluatorPtr CreateEvaluator(const FunctionPtr& evaluationFunction, const std::vector<ProgressWriterPtr>& progressWriters)
    {
        return MakeSharedObject<Evaluator>(evaluationFunction, progressWriters, true);
    }

    Evaluator::Evaluator(
        const FunctionPtr& evaluationFunction,
        const std::vector<ProgressWriterPtr>& progressWriters,
        bool initializeCombined)
        :  m_evaluationFunction(evaluationFunction),
           m_aggregatedTestEvalCriterionValue(std::make_shared<Accumulator>()),
           m_progressWriters(progressWriters.begin(), progressWriters.end())
    {
        // By default we set the number of threads to hardware concurrency.
        if (!Internal::MaxNumCPUThreadsSet())
            SetMaxNumCPUThreads(std::thread::hardware_concurrency());

        // Nullptr evaluation is only allowed by the derived classes.
        if (!m_evaluationFunction)
        {
            if(initializeCombined)
                InvalidArgument("Eval function is not allowed to be null.");
            return;
        }

        if (!m_evaluationFunction->Output().DynamicAxes().empty())
        {
            m_aggregatedEvaluationFunction = ReduceSum(m_evaluationFunction, Axis::AllAxes(), L"aggregateEvalMetric");
            m_testSampleCountVar = m_evaluationFunction;
        }
        else
        {
            m_aggregatedEvaluationFunction = m_evaluationFunction;
            m_testSampleCountVar = m_evaluationFunction->RootFunction()->Inputs()[0];
        }

        if(initializeCombined)
            m_combinedEvalFunction = Combine(GetCombinedEvalFunctionArgs());
    }

    std::vector<Variable> Evaluator::GetCombinedEvalFunctionArgs() const
    {
        if (!m_evaluationFunction)
            return std::vector<Variable>();

        std::vector<Variable> result{ m_evaluationFunction };
        if (m_evaluationFunction != m_aggregatedEvaluationFunction)
            result.push_back(m_aggregatedEvaluationFunction);

        if (m_evaluationFunction != m_testSampleCountVar && m_aggregatedEvaluationFunction != m_testSampleCountVar)
            result.push_back(m_testSampleCountVar);

        return result;
    }

    size_t Evaluator::GetSampleCount(const Variable& var, const ValuePtr& value)
    {
        auto valueDataShape = value->Shape();
        size_t numMaskedSamples = value->MaskedCount();
        size_t numSamplesInDataArrayView = valueDataShape.SubShape(var.Shape().Rank()).TotalSize();
        if (numMaskedSamples > numSamplesInDataArrayView)
            LogicError("Number (%d) of masked values cannot exceed the number (%d) of samples that the Value object's Data NDArrayView can hold.",
            (int)numMaskedSamples, (int)numSamplesInDataArrayView);

        return (numSamplesInDataArrayView - numMaskedSamples);
    }

    std::unordered_map<Variable, ValuePtr> Evaluator::GetInputs(const std::unordered_map<Variable, MinibatchData>& arguments)
    {
        std::unordered_map<Variable, ValuePtr> inputs(arguments.size());
        for (const auto& kv : arguments)
        {
            inputs[kv.first] = kv.second.data;
        }
        return inputs;
    }

    double Evaluator::TestMinibatch(const std::unordered_map<Variable, MinibatchData>& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/)
    {
        std::unordered_map<Variable, ValuePtr> outputsToFetch = {};
        return TestMinibatch(GetInputs(arguments), outputsToFetch, computeDevice);
    }

    double Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/)
    {
        std::unordered_map<Variable, ValuePtr> outputsToFetch = {};
        return TestMinibatch(arguments, outputsToFetch, computeDevice);
    }

    double Evaluator::TestMinibatch(const std::unordered_map<Variable, MinibatchData>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, const DeviceDescriptor& computeDevice)
    {
        return TestMinibatch(GetInputs(arguments), outputsToFetch, computeDevice);
    }

    double Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, const DeviceDescriptor& computeDevice)
    {
        std::pair<ValuePtr, size_t> evalMinibatchValue;
        TestMinibatch(arguments, outputsToFetch, evalMinibatchValue, computeDevice, false);
        return evalMinibatchValue.first->AsScalar<double>() / evalMinibatchValue.second;
    }

    bool Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::pair<ValuePtr, size_t>& result, const DeviceDescriptor& computeDevice, bool distributed)
    {
        std::unordered_map<Variable, ValuePtr> outputsToFetch = {};
        return TestMinibatch(arguments, outputsToFetch, result, computeDevice, distributed);
    }

    bool Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, std::pair<ValuePtr, size_t>& result, const DeviceDescriptor& computeDevice, bool distributed)
    {
        result = TestLocalMinibatch(arguments, outputsToFetch, computeDevice);
        if (distributed)
        {
            if (!outputsToFetch.empty())
                RuntimeError("Custom outputs are not yet supported in distributed evaluation.");

            double localSampleCount = static_cast<double>(result.second);

            auto values = std::vector<NDArrayViewPtr>{ result.first->Data(), MakeSharedObject<NDArrayView>(NDShape{}, &localSampleCount, 1, DeviceDescriptor::CPUDevice()) };
            DistributedCommunicatorPtr communicator = MPICommunicator();
            communicator->AggregateInPlace(values, communicator->Workers());
            result.second = static_cast<size_t>(localSampleCount);
        }

        bool hasData = (result.second != 0);
        if (hasData)
            UpdateTestProgress(result.second, result.first, computeDevice);

        return hasData;
    }

    std::pair<ValuePtr, size_t> Evaluator::TestLocalMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, const DeviceDescriptor& computeDevice)
    {
        if (!m_aggregatedEvaluationFunction)
            InvalidArgument("Evaluator::TestMinibatch: Cannot test when no evaluation function was specified during construction.");

        if (arguments.empty()) // Empty minibatch, return 0.
        {
            auto zeroValue = MakeSharedObject<Value>(
                MakeSharedObject<NDArrayView>(
                    m_aggregatedEvaluationFunction->Output().GetDataType(),
                    m_aggregatedEvaluationFunction->Output().IsSparse() ? StorageFormat::SparseCSC : StorageFormat::Dense,
                    m_aggregatedEvaluationFunction->Output().Shape(), computeDevice));
            if(zeroValue->GetDataType() == DataType::Float)
                zeroValue->Data()->SetValue(0.0f);
            else
                zeroValue->Data()->SetValue(0.0);
            return std::make_pair(zeroValue, 0);
        }

        std::unordered_map<Variable, ValuePtr> outputs = { { m_aggregatedEvaluationFunction, nullptr }, { m_testSampleCountVar, nullptr } };
        outputs.insert(outputsToFetch.begin(), outputsToFetch.end());

        m_combinedEvalFunction->Forward(arguments, outputs, computeDevice);

        const ValuePtr& aggregateEvalCriterionValue = outputs[m_aggregatedEvaluationFunction];
        auto sampleCount = GetSampleCount(m_testSampleCountVar, outputs[m_testSampleCountVar]);

        // Copy back output values for requested variables only.
        for (auto& o : outputsToFetch)
            o.second = outputs[o.first];

        return make_pair(aggregateEvalCriterionValue, sampleCount);
    }

    void Evaluator::UpdateTestProgress(size_t numSamples, const ValuePtr& evalCriterion, const DeviceDescriptor& computeDevice)
    {
        if (numSamples == 0)
            return;

        if (m_aggregatedTestEvalCriterionValue)
            m_aggregatedTestEvalCriterionValue->Update(evalCriterion, computeDevice);

        for (auto& progressWriter : m_progressWriters)
            progressWriter->UpdateTest(numSamples, m_aggregatedTestEvalCriterionValue);
    }

    void Evaluator::SummarizeTestProgress()
    {
        for (auto& progressWriter : m_progressWriters)
            progressWriter->WriteTestSummary(m_aggregatedTestEvalCriterionValue);

        if (m_aggregatedTestEvalCriterionValue)
            m_aggregatedTestEvalCriterionValue->Reset();
    }
}
back to top