// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE.md file in the project root for full license information. // // LibSVMBinaryReader.cpp : Defines the exported functions for the DLL application. // #include "stdafx.h" #define DATAREADER_EXPORTS // creating the exports here #include "DataReader.h" #include "LibSVMBinaryReader.h" #include "fileutil.h" // for fexists() #include #include #include #include #include "CUDAPageLockedMemAllocator.h" #include #include #ifndef _WIN32 #include #include #include #include #include #include #endif namespace Microsoft { namespace MSR { namespace CNTK { template std::shared_ptr AllocateIntermediateBuffer(int deviceId, size_t numElements) { if (deviceId >= 0) { // Use pinned memory for GPU devices for better copy performance size_t totalSize = sizeof(ElemType) * numElements; return std::shared_ptr((ElemType*)CUDAPageLockedMemAllocator::Malloc(totalSize, deviceId), [deviceId](ElemType* p) { CUDAPageLockedMemAllocator::Free(p, deviceId); }); } // Otherwise use usual CPU memory. return std::shared_ptr(new ElemType[numElements], [](ElemType* p) { delete[] p; }); } DWORD HIDWORD(size_t size) { return size >> 32; } DWORD LODWORD(size_t size) { return size & 0xFFFFFFFF; } template DenseBinaryMatrix::DenseBinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols) : BinaryMatrix(name, deviceID, numRows, numCols) { // This-> is required for name resolution in gcc, otherwise the compiler complains. this->m_values = AllocateIntermediateBuffer(deviceID, numRows * numCols); } template void DenseBinaryMatrix::Clear() { this->m_numRows = 0; } template void DenseBinaryMatrix::Dispose() { if (this->m_values != nullptr) { this->m_values = nullptr; } } template void DenseBinaryMatrix::Fill(Matrix* matrix) { matrix->SetValue(this->m_maxNumCols, this->m_numRows, matrix->GetDeviceId(), this->m_values.get(), matrixFlagNormal); #if DEBUG matrix->Print("testname"); #endif } template void DenseBinaryMatrix::SetMaxRows(size_t maxRows) { if (maxRows > this->m_maxNumRows) { auto values = AllocateIntermediateBuffer(this->m_deviceID, this->m_maxNumCols * maxRows); if (this->m_values != nullptr) { if (this->m_numRows > 0) { memcpy(values.get(), this->m_values.get(), sizeof(ElemType) * this->m_numRows * this->m_maxNumCols); } } this->m_values = values; this->m_maxNumRows = maxRows; } } template void DenseBinaryMatrix::AddValues(void* values, size_t numRows) { memcpy(this->m_values.get() + this->m_numRows * this->m_maxNumCols, values, sizeof(ElemType) * numRows * this->m_maxNumCols); this->m_numRows += numRows; } template SparseBinaryMatrix::SparseBinaryMatrix(wstring name, int deviceId, size_t numRows, size_t numCols) : BinaryMatrix(name, deviceId, numRows, numCols), m_rowIndices(nullptr), m_colIndices(nullptr), m_nnz(0), m_maxNNz(0) { m_colIndices = AllocateIntermediateBuffer(deviceId, numRows + 1); m_colIndices.get()[0] = 0; } template void SparseBinaryMatrix::Dispose() { this->m_values = nullptr; m_colIndices = nullptr; m_rowIndices = nullptr; } template void SparseBinaryMatrix::SetMaxRows(size_t maxRows) { if (maxRows > this->m_maxNumRows) { std::shared_ptr colIndices = AllocateIntermediateBuffer(this->m_deviceID, maxRows + 1); if (this->m_colIndices != nullptr) { if (this->m_numRows > 0) { memcpy(colIndices.get(), m_colIndices.get(), sizeof(int32_t) * (this->m_numRows + 1)); } } this->m_colIndices = colIndices; this->m_maxNumRows = maxRows; } } template void SparseBinaryMatrix::ResizeArrays(size_t newNNz) { if (newNNz + m_nnz < this->m_maxNNz) { return; } size_t newMaxNNz = (size_t)((newNNz + m_nnz) * 1.3); auto rowIndices = AllocateIntermediateBuffer(m_deviceID, newMaxNNz); auto values = AllocateIntermediateBuffer(m_deviceID, newMaxNNz); // copy old values if any if (m_nnz > 0) { assert(m_rowIndices && m_values); // m_nnz > 0 implies that these pointers are allocated memcpy(rowIndices.get(), m_rowIndices.get(), sizeof(int32_t) * m_maxNNz); memcpy(values.get(), m_values.get(), sizeof(ElemType) * m_maxNNz); } m_rowIndices = rowIndices; m_values = values; m_maxNNz = newMaxNNz; } template void SparseBinaryMatrix::Clear() { m_numRows = 0; m_nnz = 0; } template void SparseBinaryMatrix::AddValues(void* values, size_t nnz) { memcpy(this->m_values.get() + this->m_nnz, values, sizeof(ElemType) * nnz); } template void SparseBinaryMatrix::AddColIndices(void* colIndices, size_t numCols) { int32_t* temp = (int32_t*) colIndices; for (int c = 1; c < numCols; c++) { m_colIndices.get()[this->m_numRows + c] = temp[c] + (int32_t) this->m_nnz; } this->m_numRows += numCols - 1; } template void SparseBinaryMatrix::AddRowIndices(void* rowIndices, size_t nnz) { memcpy(m_rowIndices.get() + this->m_nnz, rowIndices, sizeof(int32_t) * nnz); } template void SparseBinaryMatrix::Fill(Matrix* matrix) { matrix->SetMatrixFromCSCFormat(m_colIndices.get(), m_rowIndices.get(), this->m_values.get(), this->m_nnz, this->m_maxNumCols, this->m_numRows); #if DEBUG matrix->Print("testname"); #endif } template SparseBinaryInput::SparseBinaryInput(std::wstring fileName) : m_fileName(fileName), m_readOrder(nullptr), m_readOrderLength(0), m_randomize(false), m_tempValues(nullptr), m_tempValuesSize(0), m_offsets(nullptr), m_offsetsStart(0), m_startMB(0), m_endMB(0) { std::string name = msra::strfun::utf8(m_fileName); m_inFile.open(name, ifstream::binary | ifstream::in); } template SparseBinaryInput::~SparseBinaryInput() { } template void SparseBinaryInput::Init(std::map rename) { size_t base_offset = 0; m_inFile.seekg(0, ifstream::beg); m_inFile.read((char*) &m_numRows, sizeof(int64_t)); base_offset += sizeof(int64_t); m_inFile.read((char*) &m_numBatches, sizeof(int64_t)); base_offset += sizeof(int64_t); int32_t numFeatures; int32_t numLabels; m_inFile.read((char*) &numFeatures, sizeof(int32_t)); base_offset += sizeof(int32_t); m_inFile.read((char*) &numLabels, sizeof(int32_t)); base_offset += sizeof(int32_t); int32_t len; int32_t numCols; int32_t maxLen = 100; char* tempName = (char*) malloc(maxLen); for (int32_t c = 0; c < numFeatures; c++) { m_inFile.read((char*) &len, sizeof(int32_t)); if (len + 1 > maxLen) { maxLen = len + 1; free(tempName); tempName = (char*) malloc(maxLen); } base_offset += sizeof(int32_t); m_inFile.read((char*) tempName, len); tempName[len] = '\0'; // std::string name((char*)header_buffer + base_offset, len); std::wstring wname = msra::strfun::utf16(tempName); if (rename.find(wname) == rename.end()) { m_features.emplace_back(wname); } else { m_features.emplace_back(rename[wname]); } base_offset += sizeof(int8_t) * len; m_inFile.read((char*) &numCols, sizeof(numCols)); // numCols = *(int32_t*)((char*)header_buffer + base_offset); base_offset += sizeof(int32_t); m_mappedNumCols[m_features.back()] = numCols; } for (int32_t c = 0; c < numLabels; c++) { m_inFile.read((char*) &len, sizeof(int32_t)); if (len + 1 > maxLen) { maxLen = len + 1; free(tempName); tempName = (char*) malloc(maxLen); } base_offset += sizeof(int32_t); // std::string name((char*)header_buffer + base_offset, len); m_inFile.read((char*) tempName, len); tempName[len] = '\0'; std::wstring wname = msra::strfun::utf16(tempName); if (rename.find(wname) == rename.end()) { m_labels.emplace_back(wname); } else { // m_features.emplace_back(rename[wname]); m_labels.emplace_back(rename[wname]); } base_offset += sizeof(int8_t) * len; // numCols = *(int32_t*)((char*)header_buffer + base_offset); m_inFile.read((char*) &numCols, sizeof(numCols)); base_offset += sizeof(int32_t); m_mappedNumCols[m_labels.back()] = numCols; } free(tempName); m_offsetsStart = base_offset; m_dataStart = m_offsetsStart + m_numBatches * sizeof(int64_t); /*Read in the microbatch size here*/ m_inFile.seekg(m_dataStart, ios::beg); m_inFile.read((char*) &m_microBatchSize, sizeof(int32_t)); m_mbSize = (size_t) m_microBatchSize; m_inFile.seekg(0, ios::end); m_fileSize = (size_t) m_inFile.tellg(); m_maxMBSize = 0; } template bool SparseBinaryInput::Randomize() { return false; /* if (m_randomize > 0) { return true; } return false; */ } template void SparseBinaryInput::FillReadOrder(size_t windowSize) { if (m_readOrder != nullptr) { free(m_readOrder); } m_readOrder = (size_t*) malloc(sizeof(size_t) * windowSize); for (size_t c = 0; c < windowSize; c++) { m_readOrder[c] = c; } } template void SparseBinaryInput::ReadOffsets(size_t startMB, size_t numMBs) { if (startMB == m_startMB && m_endMB == startMB + numMBs) { return; } if (m_offsets != nullptr) { free(m_offsets); } m_offsets = (int64_t*) malloc(sizeof(int64_t) * (numMBs + 1)); m_inFile.seekg(m_offsetsStart + startMB * sizeof(int64_t), ios::beg); m_inFile.read((char*) &(m_offsets[0]), sizeof(int64_t) * numMBs); if (startMB + numMBs < m_numBatches) { m_inFile.read((char*) &(m_offsets[numMBs]), sizeof(int64_t)); } else { m_offsets[numMBs] = m_fileSize; } m_startMB = startMB; m_endMB = startMB + numMBs; } template void SparseBinaryInput::Shuffle() { if (Randomize()) { shuffle(&(m_readOrder[0]), &(m_readOrder[m_readOrderLength - 1]), m_randomEngine); // ToDo: move to different shuffle implementation to be platform independent } } template void SparseBinaryInput::StartDistributedMinibatchLoop(size_t mbSize, size_t subsetNum, size_t numSubsets) { m_nextMB = 0; m_mbSize = mbSize / numSubsets; m_subsetNum = subsetNum; m_numSubsets = numSubsets; m_epochSize = m_numBatches / numSubsets; size_t startMB = m_epochSize * subsetNum; size_t endMB = m_epochSize * (subsetNum + 1); size_t remainder = m_numBatches % numSubsets; size_t lb = min(remainder, subsetNum); size_t ub = min(remainder, subsetNum + 1); m_epochSize += (subsetNum < remainder) ? 1 : 0; startMB += lb; endMB += ub; m_windowSize = endMB - startMB; if (m_windowSize != m_readOrderLength) { FillReadOrder(m_windowSize); m_readOrderLength = m_windowSize; } Shuffle(); ReadOffsets(startMB, m_windowSize); size_t maxMBSize = 0; for (size_t c = 0; c < m_windowSize; c++) { maxMBSize = max(maxMBSize, (size_t)(m_offsets[c + 1] - m_offsets[c])); // fprintf(stderr, "m_offsets[%lu] = %lu\n", c, m_offsets[c]); } if (maxMBSize > m_maxMBSize) { m_maxMBSize = maxMBSize; while (m_dataToProduce.size() > 0) { free(m_dataToProduce.pop()); } // fprintf(stderr, "max mb size: %ld\n", m_maxMBSize); size_t maxMem = 1024 * 1024 * 1024; // 1GB size_t maxPointers = maxMem / m_maxMBSize; for (size_t c = 0; c < maxPointers; c++) { void* dataBuffer = malloc(m_maxMBSize); m_dataToProduce.push(dataBuffer); } } std::thread readData([this] { this->ReadMinibatches(m_readOrder, m_readOrderLength); }); readData.detach(); } template void* SparseBinaryInput::GetTempDataPointer(size_t numBytes) { if (m_tempValuesSize < numBytes) { if (m_tempValues != nullptr) { free(m_tempValues); } m_tempValuesSize = (int32_t)(numBytes * 1.3); m_tempValues = malloc(m_tempValuesSize); } return m_tempValues; } template shared_ptr> SparseBinaryInput::CreateMatrix(std::wstring matName, int deviceId) { shared_ptr> retVal; // = nullptr; // if (m_features.find(matName) != m_features.end()) { if (std::find(m_features.begin(), m_features.end(), matName) != m_features.end()) { retVal = make_shared>(matName, deviceId, m_mbSize, m_mappedNumCols[matName]); } // else if (m_labels.find(matName) != m_labels.end()) { else if (std::find(m_labels.begin(), m_labels.end(), matName) != m_labels.end()) { retVal = make_shared>(matName, deviceId, m_mbSize, m_mappedNumCols[matName]); } return retVal; } template void SparseBinaryInput::ReadMinibatches(size_t* read_order, size_t numToRead) { #if DEBUG marker_series series(L"Read Minibatches"); // diagnostic::span span(series, L"Reading Data"); span* read_span; #endif for (size_t c = 0; c < numToRead; c++) { #if DEBUG read_span = new span(series, 1, L"Getting Buffer %ld\n", c); #endif // fprintf(stderr, "start reading data %ld\n", c); size_t readSize = m_offsets[read_order[c] + 1] - m_offsets[read_order[c]]; //void* data_buffer = GetTempDataPointer(readSize); #if DEBUG series.write_flag(_T("Getting buffer.")); #endif void* data_buffer = m_dataToProduce.pop(); #if DEBUG delete read_span; series.write_flag(_T("Got buffer.")); read_span = new span(series, 2, L"Reading Data %ld\n", c); #endif m_inFile.clear(); #if DEBUG series.write_flag(_T("seeking.")); #endif m_inFile.seekg(m_dataStart + m_offsets[c], ios::beg); #if DEBUG series.write_flag(_T("reading.")); #endif m_inFile.read((char*) data_buffer, readSize); m_dataToConsume.push(data_buffer); //fprintf(stderr, "done reading data %ld\n", c); #if DEBUG series.write_flag(_T("Done read, pushed buffer.")); delete read_span; #endif } //m_dataToConsume.push(nullptr); #if DEBUG series.write_flag(_T("Done reading.")); #endif } template size_t SparseBinaryInput::ReadMinibatch(void* data_buffer, std::map>>& matrices) { // fprintf(stderr, "start read minibatch.\n"); /* size_t readSize = m_offsets[cur_batch + 1] - m_offsets[cur_batch]; void* data_buffer = GetTempDataPointer(readSize); fprintf(stderr, "start reading data.\n"); m_inFile.clear(); m_inFile.seekg(m_dataStart + m_offsets[cur_batch], ios::beg); m_inFile.read((char*)data_buffer, readSize); fprintf(stderr, "done reading data.\n"); */ int32_t nnz; int32_t curMBSize; int64_t buffer_offset = 0; curMBSize = *(int32_t*) ((char*) data_buffer + buffer_offset); buffer_offset += sizeof(int32_t); for (int32_t c = 0; c < m_features.size(); c++) { // fprintf(stderr, "read features %d.\n", c); nnz = *(int32_t*) ((char*) data_buffer + buffer_offset); buffer_offset += sizeof(int32_t); ElemType* vals = (ElemType*) ((char*) data_buffer + buffer_offset); buffer_offset += sizeof(ElemType) * nnz; int32_t* rowIndices = (int32_t*) ((char*) data_buffer + buffer_offset); buffer_offset += sizeof(int32_t) * nnz; int32_t* colIndices = (int32_t*) ((char*) data_buffer + buffer_offset); buffer_offset += sizeof(int32_t) * (curMBSize + 1); auto findMat = matrices.find(m_features[c]); if (findMat != matrices.end()) { auto mat = findMat->second; mat->ResizeArrays(nnz); mat->AddValues(vals, nnz); mat->AddRowIndices(rowIndices, nnz); mat->AddColIndices(colIndices, curMBSize + 1); mat->UpdateNNz(nnz); #ifdef DEBUG mat->Print("features"); #endif } } for (int32_t c = 0; c < m_labels.size(); c++) { // fprintf(stderr, "read labels %d.\n", c); int32_t numCols = m_mappedNumCols[m_labels[c]]; ElemType* vals = (ElemType*) ((char*) data_buffer + buffer_offset); buffer_offset += sizeof(ElemType) * curMBSize * numCols; auto findMat = matrices.find(m_labels[c]); if (findMat != matrices.end()) { auto mat = findMat->second; mat->AddValues(vals, curMBSize); #ifdef DEBUG mat->Print("labels"); #endif } } return (size_t) curMBSize; } template size_t SparseBinaryInput::FillMatrices(std::map>>& matrices) { // fprintf(stderr, "start fill matrices\n"); size_t curSize = 0; for (auto mat : matrices) { mat.second->SetMaxRows(m_mbSize); mat.second->Clear(); } void* data_buffer; // fprintf(stderr, "start while\n"); // clock_t start_w = clock(); // while (curSize + m_microBatchSize <= m_mbSize && (data_buffer = m_dataToConsume.pop()) != nullptr) { while (curSize + m_microBatchSize <= m_mbSize && m_nextMB < m_epochSize) { data_buffer = m_dataToConsume.pop(); // clock_t in_w = clock(); // start_w = in_w - start_w; // fprintf(stderr, "start read mb\tIt took me %d clicks (%f seconds).\n", start_w, ((float)start_w) / CLOCKS_PER_SEC); // start_w = in_w; // fprintf(stderr, "start read mb\n"); curSize += ReadMinibatch(data_buffer, matrices); // fprintf(stderr, "end read mb\n"); m_nextMB++; m_dataToProduce.push(data_buffer); } // fprintf(stderr, "end fill matrices\n"); return curSize; } template template void LibSVMBinaryReader::InitFromConfig(const ConfigRecordType& readerConfig) { std::map rename; RenamedMatrices(readerConfig, rename); if (readerConfig.Exists(L"randomize")) { string randomizeString = readerConfig(L"randomize"); if (randomizeString == "None") { m_randomize = 0L; } else if (randomizeString == "Auto") { time_t rawtime; struct tm* timeinfo; time(&rawtime); timeinfo = localtime(&rawtime); m_randomize = (unsigned long) (timeinfo->tm_sec + timeinfo->tm_min * 60 + timeinfo->tm_hour * 60 * 60); } else { m_randomize = readerConfig(L"randomize", 0); } } else { m_randomize = 0L; } m_partialMinibatch = true; std::string minibatchMode(readerConfig(L"minibatchMode", "Partial")); m_partialMinibatch = EqualCI(minibatchMode, "Partial"); std::wstring file = readerConfig(L"file", L""); m_dataInput = make_shared>(file); m_dataInput->Init(rename); m_mbSize = (size_t) readerConfig(L"minibatch", 0); if (m_mbSize > 0) { if (m_dataInput->GetMBSize() != m_mbSize) { RuntimeError("Data file and config file have mismatched minibatch sizes.\n"); return; } } else { m_mbSize = m_dataInput->GetMBSize(); } m_prefetchEnabled = true; } template void LibSVMBinaryReader::Destroy() { } template LibSVMBinaryReader::~LibSVMBinaryReader() { Destroy(); } template void LibSVMBinaryReader::StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples) { return StartDistributedMinibatchLoop(mbSize, epoch, 0, 1, requestedEpochSamples); } template void LibSVMBinaryReader::StartDistributedMinibatchLoop(size_t mbSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t /*requestedEpochSamples*/) { m_epoch = epoch; m_mbSize = mbSize; #if DEBUG if (reader_series != NULL) { delete reader_series; } reader_series = new marker_series(L"Base Reader"); cur_read = 0; #endif m_dataInput->StartDistributedMinibatchLoop(mbSize, subsetNum, numSubsets); } template void LibSVMBinaryReader::CheckDataMatrices(StreamMinibatchInputs& matrices) { if (m_dataMatrices.empty()) { for (auto inmat : matrices) { shared_ptr> mat = m_dataInput->CreateMatrix(inmat.first, inmat.second.matrix->GetDeviceId()); if (mat != nullptr) { m_dataMatrices[inmat.first] = mat; } } } } template void LibSVMBinaryReader::DoDSSMMatrix(Matrix& mat, size_t actualMBSize) { size_t numRows = mat.GetNumRows(); if (DSSMCols < actualMBSize) { DSSMCols = actualMBSize; m_dssmLabels = AllocateIntermediateBuffer(mat.GetDeviceId(), numRows * actualMBSize); memset(m_dssmLabels.get(), 0, sizeof(ElemType) * numRows * actualMBSize); for (size_t c = 0; c < numRows * actualMBSize; c += numRows) { m_dssmLabels.get()[c] = 1; } } if (mat.GetNumCols() != actualMBSize) { mat.SetValue(numRows, actualMBSize, mat.GetDeviceId(), m_dssmLabels.get(), matrixFlagNormal); } } template bool LibSVMBinaryReader::TryGetMinibatch(StreamMinibatchInputs& matrices) { //timer = clock(); #if DEBUG span minibatch_span(*reader_series, 1, L"Get Minibatch: %ld", cur_read); #endif size_t actualMBSize = 0; if (m_prefetchEnabled) { if (!m_pendingAsyncGetMinibatch.valid()) { // fprintf(stderr, "not valid\n"); CheckDataMatrices(matrices); m_pendingAsyncGetMinibatch = std::async(std::launch::async, [this]() { return m_dataInput->FillMatrices(m_dataMatrices); }); } //fprintf(stderr, "before get.\n"); //timer = clock(); #if DEBUG reader_series->write_flag(_T("before get.")); #endif actualMBSize = m_pendingAsyncGetMinibatch.get(); #if DEBUG reader_series->write_flag(_T("after get.")); #endif // timer = clock() - timer; // fprintf(stderr, "done get\tIt took me %d clicks (%f seconds).\n", timer, ((float)timer) / CLOCKS_PER_SEC); if (actualMBSize == 0) { return false; } m_pMBLayout->InitAsFrameMode(actualMBSize); #if DEBUG reader_series->write_flag(_T("starting fill.")); #endif for (auto matrix : m_dataMatrices) { if (matrices.HasInput(matrix.first)) matrix.second->Fill(&matrices.GetInputMatrix(matrix.first)); } #if DEBUG reader_series->write_flag(_T("done fill.")); #endif if (matrices.HasInput(L"DSSMLabel")) DoDSSMMatrix(matrices.GetInputMatrix(L"DSSMLabel"), actualMBSize); m_pendingAsyncGetMinibatch = std::async(std::launch::async, [this]() { // CheckDataMatrices(matrices); return m_dataInput->FillMatrices(m_dataMatrices); }); } #if DEBUG cur_read++; #endif /* timer = clock() - timer; fprintf(stderr, "It took me %d clicks (%f seconds).\n", timer, ((float)timer) / CLOCKS_PER_SEC); */ // fprintf(stderr, "done\n"); return true; } template template void LibSVMBinaryReader::RenamedMatrices(const ConfigRecordType& config, std::map& rename) { for (const auto& id : config.GetMemberIds()) { if (!config.CanBeConfigRecord(id)) continue; const ConfigRecordType& temp = config(id); // see if we have a config parameters that contains a "dim" element, it's a sub key, use it if (temp.ExistsCurrent(L"rename")) { std::wstring ren = temp(L"rename"); rename.emplace(msra::strfun::utf16(id), msra::strfun::utf16(ren)); } } } template bool LibSVMBinaryReader::DataEnd() { return m_dataInput->DataEnd(); } template bool SparseBinaryInput::DataEnd() { return true; } // instantiate all the combinations we expect to be used template class LibSVMBinaryReader; template class LibSVMBinaryReader; } } }