https://github.com/Microsoft/CNTK
Tip revision: 4419c2b48d055af4810fab27f2441bb91b22b45f authored by Binbin Zhang on 04 June 2018, 03:50:57 UTC
add bidirectional FSMN node and make it work in NDL and add FSMN CPU forward
add bidirectional FSMN node and make it work in NDL and add FSMN CPU forward
Tip revision: 4419c2b
Evaluator.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#include "stdafx.h"
#include "CNTKLibrary.h"
#include "Utils.h"
#include "PerformanceProfiler.h"
namespace CNTK
{
EvaluatorPtr CreateEvaluator(const FunctionPtr& evaluationFunction, const std::vector<ProgressWriterPtr>& progressWriters)
{
return MakeSharedObject<Evaluator>(evaluationFunction, progressWriters, true);
}
Evaluator::Evaluator(
const FunctionPtr& evaluationFunction,
const std::vector<ProgressWriterPtr>& progressWriters,
bool initializeCombined)
: m_evaluationFunction(evaluationFunction),
m_aggregatedTestEvalCriterionValue(std::make_shared<Accumulator>()),
m_progressWriters(progressWriters.begin(), progressWriters.end())
{
// By default we set the number of threads to hardware concurrency.
if (!Internal::MaxNumCPUThreadsSet())
SetMaxNumCPUThreads(std::thread::hardware_concurrency());
// Nullptr evaluation is only allowed by the derived classes.
if (!m_evaluationFunction)
{
if(initializeCombined)
InvalidArgument("Eval function is not allowed to be null.");
return;
}
if (!m_evaluationFunction->Output().DynamicAxes().empty())
{
m_aggregatedEvaluationFunction = ReduceSum(m_evaluationFunction, Axis::AllAxes(), L"aggregateEvalMetric");
m_testSampleCountVar = m_evaluationFunction;
}
else
{
m_aggregatedEvaluationFunction = m_evaluationFunction;
m_testSampleCountVar = m_evaluationFunction->RootFunction()->Inputs()[0];
}
if(initializeCombined)
m_combinedEvalFunction = Combine(GetCombinedEvalFunctionArgs());
}
std::vector<Variable> Evaluator::GetCombinedEvalFunctionArgs() const
{
if (!m_evaluationFunction)
return std::vector<Variable>();
std::vector<Variable> result{ m_evaluationFunction };
if (m_evaluationFunction != m_aggregatedEvaluationFunction)
result.push_back(m_aggregatedEvaluationFunction);
if (m_evaluationFunction != m_testSampleCountVar && m_aggregatedEvaluationFunction != m_testSampleCountVar)
result.push_back(m_testSampleCountVar);
return result;
}
size_t Evaluator::GetSampleCount(const Variable& var, const ValuePtr& value)
{
auto valueDataShape = value->Shape();
size_t numMaskedSamples = value->MaskedCount();
size_t numSamplesInDataArrayView = valueDataShape.SubShape(var.Shape().Rank()).TotalSize();
if (numMaskedSamples > numSamplesInDataArrayView)
LogicError("Number (%d) of masked values cannot exceed the number (%d) of samples that the Value object's Data NDArrayView can hold.",
(int)numMaskedSamples, (int)numSamplesInDataArrayView);
return (numSamplesInDataArrayView - numMaskedSamples);
}
std::unordered_map<Variable, ValuePtr> Evaluator::GetInputs(const std::unordered_map<Variable, MinibatchData>& arguments)
{
std::unordered_map<Variable, ValuePtr> inputs(arguments.size());
for (const auto& kv : arguments)
{
inputs[kv.first] = kv.second.data;
}
return inputs;
}
double Evaluator::TestMinibatch(const std::unordered_map<Variable, MinibatchData>& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/)
{
std::unordered_map<Variable, ValuePtr> outputsToFetch = {};
return TestMinibatch(GetInputs(arguments), outputsToFetch, computeDevice);
}
double Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/)
{
std::unordered_map<Variable, ValuePtr> outputsToFetch = {};
return TestMinibatch(arguments, outputsToFetch, computeDevice);
}
double Evaluator::TestMinibatch(const std::unordered_map<Variable, MinibatchData>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, const DeviceDescriptor& computeDevice)
{
return TestMinibatch(GetInputs(arguments), outputsToFetch, computeDevice);
}
double Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, const DeviceDescriptor& computeDevice)
{
std::pair<ValuePtr, size_t> evalMinibatchValue;
TestMinibatch(arguments, outputsToFetch, evalMinibatchValue, computeDevice, false);
return evalMinibatchValue.first->AsScalar<double>() / evalMinibatchValue.second;
}
bool Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::pair<ValuePtr, size_t>& result, const DeviceDescriptor& computeDevice, bool distributed)
{
std::unordered_map<Variable, ValuePtr> outputsToFetch = {};
return TestMinibatch(arguments, outputsToFetch, result, computeDevice, distributed);
}
bool Evaluator::TestMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, std::pair<ValuePtr, size_t>& result, const DeviceDescriptor& computeDevice, bool distributed)
{
result = TestLocalMinibatch(arguments, outputsToFetch, computeDevice);
if (distributed)
{
if (!outputsToFetch.empty())
RuntimeError("Custom outputs are not yet supported in distributed evaluation.");
double localSampleCount = static_cast<double>(result.second);
auto values = std::vector<NDArrayViewPtr>{ result.first->Data(), MakeSharedObject<NDArrayView>(NDShape{}, &localSampleCount, 1, DeviceDescriptor::CPUDevice()) };
DistributedCommunicatorPtr communicator = MPICommunicator();
communicator->AggregateInPlace(values, communicator->Workers());
result.second = static_cast<size_t>(localSampleCount);
}
bool hasData = (result.second != 0);
if (hasData)
UpdateTestProgress(result.second, result.first, computeDevice);
return hasData;
}
std::pair<ValuePtr, size_t> Evaluator::TestLocalMinibatch(const std::unordered_map<Variable, ValuePtr>& arguments, std::unordered_map<Variable, ValuePtr>& outputsToFetch, const DeviceDescriptor& computeDevice)
{
if (!m_aggregatedEvaluationFunction)
InvalidArgument("Evaluator::TestMinibatch: Cannot test when no evaluation function was specified during construction.");
if (arguments.empty()) // Empty minibatch, return 0.
{
auto zeroValue = MakeSharedObject<Value>(
MakeSharedObject<NDArrayView>(
m_aggregatedEvaluationFunction->Output().GetDataType(),
m_aggregatedEvaluationFunction->Output().IsSparse() ? StorageFormat::SparseCSC : StorageFormat::Dense,
m_aggregatedEvaluationFunction->Output().Shape(), computeDevice));
if(zeroValue->GetDataType() == DataType::Float)
zeroValue->Data()->SetValue(0.0f);
else
zeroValue->Data()->SetValue(0.0);
return std::make_pair(zeroValue, 0);
}
std::unordered_map<Variable, ValuePtr> outputs = { { m_aggregatedEvaluationFunction, nullptr }, { m_testSampleCountVar, nullptr } };
outputs.insert(outputsToFetch.begin(), outputsToFetch.end());
m_combinedEvalFunction->Forward(arguments, outputs, computeDevice);
const ValuePtr& aggregateEvalCriterionValue = outputs[m_aggregatedEvaluationFunction];
auto sampleCount = GetSampleCount(m_testSampleCountVar, outputs[m_testSampleCountVar]);
// Copy back output values for requested variables only.
for (auto& o : outputsToFetch)
o.second = outputs[o.first];
return make_pair(aggregateEvalCriterionValue, sampleCount);
}
void Evaluator::UpdateTestProgress(size_t numSamples, const ValuePtr& evalCriterion, const DeviceDescriptor& computeDevice)
{
if (numSamples == 0)
return;
if (m_aggregatedTestEvalCriterionValue)
m_aggregatedTestEvalCriterionValue->Update(evalCriterion, computeDevice);
for (auto& progressWriter : m_progressWriters)
progressWriter->UpdateTest(numSamples, m_aggregatedTestEvalCriterionValue);
}
void Evaluator::SummarizeTestProgress()
{
for (auto& progressWriter : m_progressWriters)
progressWriter->WriteTestSummary(m_aggregatedTestEvalCriterionValue);
if (m_aggregatedTestEvalCriterionValue)
m_aggregatedTestEvalCriterionValue->Reset();
}
}