https://github.com/Microsoft/CNTK
Tip revision: 824963e7824baadbf31f6e84849ba5f542cfaac1 authored by Nikola Velickovic (E-Search) on 09 May 2017, 08:35:44 UTC
Supress mem sharing
Supress mem sharing
Tip revision: 824963e
SparsePCReader.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// SparsePCReader.cpp : Defines the Sparse Parallel Corpus reader.
//
#include "stdafx.h"
#define DATAREADER_EXPORTS // creating the exports here
#include "DataReader.h"
#include "SparsePCReader.h"
#ifdef LEAKDETECT
#include <vld.h> // leak detection
#endif
#ifndef SPARSE_PCREADER_USE_WINDOWS_API
#include <sys/mman.h>
#include <sys/stat.h>
#include <stdio.h>
#include <fcntl.h>
#endif
namespace Microsoft { namespace MSR { namespace CNTK {
DWORD HIDWORD(size_t size)
{
return size >> 32;
}
DWORD LODWORD(size_t size)
{
return size & 0xFFFFFFFF;
}
template <class ElemType>
SparsePCReader<ElemType>::~SparsePCReader()
{
#ifdef SPARSE_PCREADER_USE_WINDOWS_API
if (m_filemap != NULL)
{
UnmapViewOfFile(m_filemap);
}
if (m_dataBuffer != NULL)
{
UnmapViewOfFile(m_dataBuffer);
}
CloseHandle(m_hndl);
#else
munmap(m_dataBuffer, m_filePositionMax);
close(m_hndl);
#endif
for (int i = 0; i < m_featureCount; i++)
{
if (m_values[i] != NULL)
{
free(m_values[i]);
}
if (m_rowIndices[i] != NULL)
{
free(m_rowIndices[i]);
}
if (m_colIndices[i] != NULL)
{
free(m_colIndices[i]);
}
}
if (m_labelsBuffer != NULL)
{
free(m_labelsBuffer);
}
}
template <class ElemType>
void SparsePCReader<ElemType>::Destroy()
{
delete this;
}
// Init - Reader Initialize for multiple data sets
// config - [in] configuration parameters for the datareader
template <class ElemType>
template <class ConfigRecordType>
void SparsePCReader<ElemType>::InitFromConfig(const ConfigRecordType& readerConfig)
{
// Sparse PC reader considers every consecutive N rows to be part of a single block.
// This is used later to compute the corss-entropy with softmax per block.
// Default value is 1 to indicate all rows are independent.
m_microBatchSize = readerConfig(L"microbatchSize", (size_t) 1);
m_miniBatchSize = 0;
m_traceLevel = readerConfig(L"traceLevel", 0);
m_maxReadData = readerConfig(L"maxReadData", (size_t) 0);
m_doGradientCheck = readerConfig(L"gradientCheck", false);
m_returnDense = readerConfig(L"returnDense", false);
m_sparsenessFactor = readerConfig(L"sparsenessFactor", (size_t) 50); // We don't expect more than one in 50 input positions to have non-zero values
m_verificationCode = (int32_t) readerConfig(L"verificationCode", (size_t) 0);
std::vector<std::wstring> featureNames;
std::vector<std::wstring> labelNames;
m_file = (const wstring&) readerConfig(L"file");
// Determine the names of the features and lables sections in the config file.
// features - [in,out] a vector of feature name strings
// labels - [in,out] a vector of label name strings
GetFileConfigNames(readerConfig, featureNames, labelNames);
if (labelNames.size() != 1)
{
RuntimeError("SparsePC requires exactly one label. Their names should match those in NDL definition");
return;
}
m_featureCount = featureNames.size();
m_labelName = labelNames[0];
m_featureNames = std::vector<std::wstring>(m_featureCount);
m_dims = std::vector<size_t>(m_featureCount);
m_values = std::vector<ElemType*>(m_featureCount);
m_rowIndices = std::vector<int32_t*>(m_featureCount);
m_colIndices = std::vector<int32_t*>(m_featureCount);
for (int i = 0; i < m_featureCount; i++)
{
// In the config file, we must specify query features first, then document features. The sequence is different here. Pay attention
m_featureNames[i] = featureNames[m_featureCount - i - 1];
ConfigParameters featureConfig = readerConfig(m_featureNames[i]);
if (featureConfig.size() == 0)
RuntimeError("features config not found, required in configuration: i.e. 'features=[dim=506530]'");
m_dims[i] = featureConfig("dim");
}
#ifdef SPARSE_PCREADER_USE_WINDOWS_API
m_hndl = CreateFile(m_file.c_str(), GENERIC_READ,
FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
if (m_hndl == INVALID_HANDLE_VALUE)
{
RuntimeError("Unable to Open/Create file %ls, error %x", m_file.c_str(), GetLastError());
}
GetFileSizeEx(m_hndl, (PLARGE_INTEGER) &m_filePositionMax);
m_filemap = CreateFileMapping(m_hndl, NULL, PAGE_READONLY, 0, 0, NULL);
m_dataBuffer = (char*) MapViewOfFile(m_filemap,
FILE_MAP_READ,
HIDWORD(0),
LODWORD(0),
0);
#else
m_hndl = open(msra::strfun::utf8(m_file).c_str(), O_RDONLY);
if (m_hndl == -1)
RuntimeError("Unable to Open/Create file %ls", m_file.c_str());
struct stat sb;
if (fstat(m_hndl, &sb) == -1)
RuntimeError("Unable to Retrieve file size for file %ls", m_file.c_str());
m_filePositionMax = sb.st_size;
m_dataBuffer = (char*) mmap(nullptr, m_filePositionMax, PROT_READ, MAP_PRIVATE, m_hndl, 0);
if (m_dataBuffer == MAP_FAILED)
{
m_dataBuffer = nullptr;
RuntimeError("Could not memory map file %ls", m_file.c_str());
}
#endif
}
//StartMinibatchLoop - Startup a minibatch loop
// mbSize - [in] size of the minibatch (number of Samples, etc.)
// epoch - [in] epoch number for this loop --ignored
// requestedEpochSamples - [in] number of samples to randomize --ignored
template <class ElemType>
void SparsePCReader<ElemType>::StartMinibatchLoop(size_t mbSize, size_t /*epoch*/, size_t /*requestedEpochSamples*/)
{
if (m_values[0] == NULL || m_miniBatchSize != mbSize)
{
m_miniBatchSize = mbSize;
for (int i = 0; i < m_featureCount; i++)
{
if (m_values[i] != NULL)
{
free(m_values[i]);
free(m_colIndices[i]);
free(m_rowIndices[i]);
free(m_labelsBuffer);
}
m_values[i] = (ElemType*) malloc(sizeof(ElemType) * m_dims[i] * m_miniBatchSize / m_sparsenessFactor);
m_rowIndices[i] = (int32_t*) malloc(sizeof(int32_t) * m_dims[i] * m_miniBatchSize / m_sparsenessFactor);
m_colIndices[i] = (int32_t*) malloc(sizeof(int32_t) * (m_miniBatchSize + 1));
m_labelsBuffer = (ElemType*) malloc(sizeof(ElemType) * m_miniBatchSize);
}
}
// reset the next read sample
m_currOffset = 0;
}
// GetMinibatch - Get the next minibatch (features and labels)
// matrices - [in] a map with named matrix types (i.e. 'features', 'labels') mapped to the corresponding matrix,
// [out] each matrix resized if necessary containing data.
// returns - true if there are more minibatches, false if no more minibatchs remain
template <class ElemType>
bool SparsePCReader<ElemType>::TryGetMinibatch(StreamMinibatchInputs& matrices)
{
// get out if they didn't call StartMinibatchLoop() first
if (m_miniBatchSize == 0)
return false;
// Return early (for debugging purposes)
if (m_maxReadData > 0 && m_currOffset >= m_maxReadData)
return false;
if (m_currOffset >= m_filePositionMax)
return false;
Matrix<ElemType>* labels = nullptr; // labels to return, or NULL if no labels in matrix set
if (matrices.HasInput(m_labelName))
{
labels = &matrices.GetInputMatrix<ElemType>(m_labelName);
if (labels->GetNumRows() != 1)
RuntimeError("SparsePCReader only supports single label value per column but the network expected %d.", (int) labels->GetNumRows());
}
std::vector<int32_t> currIndex = std::vector<int32_t>(m_featureCount);
for (int i = 0; i < m_featureCount; i++)
{
currIndex[i] = 0;
}
size_t j = 0;
for (j = 0; j < m_miniBatchSize && m_currOffset < m_filePositionMax; j++)
{
for (int i = 0; i < m_featureCount; i++)
{
m_colIndices[i][j] = currIndex[i];
int32_t nnz = *(int32_t*) ((char*) m_dataBuffer + m_currOffset);
m_currOffset += sizeof(int32_t);
if (nnz > m_dims[i] / m_sparsenessFactor)
{
RuntimeError("Input data is too dense - not enough memory allocated");
}
memcpy(m_values[i] + currIndex[i], (char*) m_dataBuffer + m_currOffset, sizeof(ElemType) * nnz);
m_currOffset += (sizeof(ElemType) * nnz);
memcpy(m_rowIndices[i] + currIndex[i], (char*) m_dataBuffer + m_currOffset, sizeof(int32_t) * nnz);
m_currOffset += (sizeof(int32_t) * nnz);
currIndex[i] += nnz;
}
ElemType label = *(ElemType*) ((char*) m_dataBuffer + m_currOffset);
m_labelsBuffer[j] = label;
m_currOffset += sizeof(ElemType);
if (m_verificationCode != 0)
{
int32_t verifCode = *(int32_t*) ((char*) m_dataBuffer + m_currOffset);
if (verifCode != m_verificationCode)
{
RuntimeError("Verification code did not match (expected %d) - error in reading data", m_verificationCode);
return false;
}
m_currOffset += sizeof(int32_t);
}
}
for (int i = 0; i < m_featureCount; i++)
{
m_colIndices[i][j] = currIndex[i];
Matrix<ElemType>& features = matrices.GetInputMatrix<ElemType>(m_featureNames[i]);
if (features.GetFormat() != MatrixFormat::matrixFormatSparseCSC)
features.SwitchToMatrixType(MatrixType::SPARSE, MatrixFormat::matrixFormatSparseCSC, false);
features.SetMatrixFromCSCFormat(m_colIndices[i], m_rowIndices[i], m_values[i], currIndex[i], m_dims[i], j);
}
if (m_returnDense || m_doGradientCheck)
{
for (int i = 0; i < m_featureCount; i++)
matrices.GetInputMatrix<ElemType>(m_featureNames[i]).SwitchToMatrixType(MatrixType::DENSE, MatrixFormat::matrixFormatDense, true);
}
if (labels)
{
labels->Resize(1, j);
labels->SetValue((ElemType) 0);
labels->SetValue(1, j, labels->GetDeviceId(), m_labelsBuffer, 0);
}
// create the MBLayout
// Each sample consists of a "sequence" of 'm_microBatchSize' samples.
// TODO: They are not really temporally ordered, so a better way would be to use tensors, once that is ready.
m_pMBLayout->Init(j / m_microBatchSize, m_microBatchSize);
for (size_t s = 0; s < j / m_microBatchSize; s++)
m_pMBLayout->AddSequence(NEW_SEQUENCE_ID, s, 0, m_microBatchSize);
return true;
}
template <class ElemType>
bool SparsePCReader<ElemType>::DataEnd() { return true; }
// GetLabelMapping - Gets the label mapping from integer index to label type
// returns - a map from numeric datatype to native label type
template <class ElemType>
const std::map<IDataReader::LabelIdType, IDataReader::LabelType>& SparsePCReader<ElemType>::GetLabelMapping(const std::wstring& /*sectionName*/)
{
return m_mapIdToLabel;
}
// SetLabelMapping - Sets the label mapping from integer index to label
// labelMapping - mapping table from label values to IDs (must be 0-n)
// note: for tasks with labels, the mapping table must be the same between a training run and a testing run
template <class ElemType>
void SparsePCReader<ElemType>::SetLabelMapping(const std::wstring& /*sectionName*/, const std::map<IDataReader::LabelIdType, LabelType>& labelMapping)
{
m_mapIdToLabel = labelMapping;
m_mapLabelToId.clear();
for (std::pair<unsigned, LabelType> var : labelMapping)
{
m_mapLabelToId[var.second] = var.first;
}
}
// instantiate all the combinations we expect to be used
template class SparsePCReader<double>;
template class SparsePCReader<float>;
} } }