// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE.md file in the project root for full license information. // #define _CRT_SECURE_NO_WARNINGS #define _SCL_SECURE_NO_WARNINGS #include #define __STDC_FORMAT_MACROS #include #include "SequencePacker.h" #include "ElementTypeUtils.h" namespace Microsoft { namespace MSR { namespace CNTK { MBLayoutPtr SequencePacker::CreateMBLayout(const StreamBatch& batch) { vector infos; for (size_t index = 0; index < batch.size(); ++index) { MBLayout::SequenceInfo info; info.seqId = index; info.tBegin = 0; info.tEnd = batch[index]->m_numberOfSamples; infos.push_back(info); } vector> placement; vector rowAllocations; // Creating the minibatch layout. MBLayoutPtr pMBLayout = make_shared(); pMBLayout->InitAsPackedSequences(infos, placement, rowAllocations); return pMBLayout; } Minibatch SequencePacker::ReadMinibatch() { auto sequences = m_sequenceEnumerator->GetNextSequences(m_minibatchSize); const auto& batch = sequences.m_data; Minibatch minibatch(sequences.m_endOfEpoch); if (batch.empty()) { return minibatch; } auto& currentBuffer = m_streamBuffers[m_currentBufferIndex]; assert(m_outputStreamDescriptions.size() == batch.size()); for (int streamIndex = 0; streamIndex < batch.size(); ++streamIndex) { const auto& streamBatch = batch[streamIndex]; if (m_checkSampleShape[streamIndex]) { CheckSampleShape(streamBatch, m_outputStreamDescriptions[streamIndex]); } const auto& type = m_outputStreamDescriptions[streamIndex]->m_storageType; auto pMBLayout = (type == StorageType::dense) ? PackDenseStream(streamBatch, streamIndex) : PackSparseStream(streamBatch, streamIndex); auto& buffer = currentBuffer[streamIndex]; auto streamMinibatch = std::make_shared(); streamMinibatch->m_data = buffer.m_data.get(); streamMinibatch->m_layout = pMBLayout; minibatch.m_data.push_back(streamMinibatch); } m_currentBufferIndex = (m_currentBufferIndex + 1) % m_numberOfBuffers; return minibatch; } void SequencePacker::CheckSampleShape(const std::vector& minibatch, StreamDescriptionPtr outputStream) { assert(!minibatch.empty()); // TODO: This should come from the network - layout that network expects. // TODO: In this case we can make outputStream const. // Currently it is not coming from SGD/Network, so we assume the first one is correct. if (outputStream->m_sampleLayout == nullptr) { outputStream->m_sampleLayout = minibatch.front()->m_sampleLayout; } for (const auto& s : minibatch) { if (s->m_sampleLayout == nullptr) { LogicError("Unknown shape of the sequence in stream '%ls'.", outputStream->m_name.c_str()); } if (*s->m_sampleLayout != *outputStream->m_sampleLayout) { RuntimeError("Packer currently does not support samples with varying shapes." "Please make sure there is a transform that unifies the shape of samples for input stream '%ls' " "or the deserializer provides samples with the same shape.", outputStream->m_name.c_str()); } } } MBLayoutPtr SequencePacker::PackDenseStream(const StreamBatch& batch, size_t streamIndex) { assert(m_outputStreamDescriptions[streamIndex]->m_storageType == StorageType::dense); const auto& stream = m_inputStreamDescriptions[streamIndex]; auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex]; size_t sampleSize = GetSampleSize(m_outputStreamDescriptions[streamIndex]); auto pMBLayout = CreateMBLayout(batch); size_t requiredSize = pMBLayout->GetNumCols() * sampleSize; if (buffer.m_size < requiredSize) { buffer.Resize(requiredSize); } auto elementSize = GetSizeByType(stream->m_elementType); const auto& sequenceInfos = pMBLayout->GetAllSequences(); // Iterate over sequences in the layout, copy samples from the // source sequences into the buffer (at appropriate offsets). for (int i = 0; i < sequenceInfos.size(); ++i) { const auto& sequenceInfo = sequenceInfos[i]; // skip gaps if (sequenceInfo.seqId == GAP_SEQUENCE_ID) { continue; } const auto& sequence = batch[sequenceInfo.seqId]; size_t numSamples = sequence->m_numberOfSamples; assert(numSamples == sequenceInfo.GetNumTimeSteps()); char* bufferPtr = buffer.m_data.get(); // Iterate over all samples in the sequence, keep track of the sample offset (which is especially // important for sparse input, where offset == number of preceding nnz elements). for (size_t sampleIndex = 0, sampleOffset = 0; sampleIndex < numSamples; ++sampleIndex) { // Compute the offset into the destination buffer, using the layout information // to get the column index corresponding to the given sample. auto destinationOffset = pMBLayout->GetColumnIndex(sequenceInfo, sampleIndex) * sampleSize; // verify that there's enough space left in the buffer to fit a full sample. assert(destinationOffset <= buffer.m_size - sampleSize); auto* destination = bufferPtr + destinationOffset; if (stream->m_storageType == StorageType::dense) { // verify that the offset (an invariant for dense). assert(sampleOffset == sampleIndex * sampleSize); PackDenseSample(destination, sequence, sampleOffset, sampleSize); sampleOffset += sampleSize; } else if (stream->m_storageType == StorageType::sparse_csc) { // TODO: make type casts members of the SparseSequenceData SparseSequenceDataPtr sparseSequence = static_pointer_cast(sequence); // make sure that the sequence meta-data is correct. assert(numSamples == sparseSequence->m_nnzCounts.size()); PackSparseSampleAsDense(destination, sparseSequence, sampleIndex, sampleOffset, sampleSize, elementSize); // move the offset by nnz count of the sample. sampleOffset += sparseSequence->m_nnzCounts[sampleIndex]; // verify that the offset is within the bounds (less or equal // to the total nnz count of the sequence). assert(sampleOffset <= sparseSequence->m_totalNnzCount); } else { RuntimeError("Storage type %d is not supported.", (int)stream->m_storageType); } } } return pMBLayout; } MBLayoutPtr SequencePacker::PackSparseStream(const StreamBatch& batch, size_t streamIndex) { assert(m_outputStreamDescriptions[streamIndex]->m_storageType == StorageType::sparse_csc); // compute the aggregate nnz count of all the sequence in the batch. size_t nnzCount = 0; for (const auto& sequence : batch) { SparseSequenceDataPtr sparseSequence = static_pointer_cast(sequence); nnzCount += sparseSequence->m_totalNnzCount; } if (nnzCount > numeric_limits::max()) { RuntimeError("Minibatch NNZ count (%" PRIu64 ") exceeds the maximum allowed " "value (%" PRIu64 ")\n", nnzCount, (size_t)numeric_limits::max()); } const auto& stream = m_inputStreamDescriptions[streamIndex]; assert(stream->m_storageType == StorageType::sparse_csc); auto elementSize = GetSizeByType(stream->m_elementType); auto indexSize = sizeof(IndexType); auto pMBLayout = CreateMBLayout(batch); // Compute the required buffer size: // size of nnz type + nnz * (size of the element type) + nnz * (size of the row index type) + // (number of columns + 1) * (size of the column index type). size_t requiredSize = sizeof(nnzCount) + nnzCount * (elementSize + indexSize) + indexSize * (pMBLayout->GetNumCols() + 1); auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex]; if (buffer.m_size < requiredSize) { buffer.Resize(requiredSize); } auto* destination = buffer.m_data.get(); // insert the nnzCount as the first element in the buffer. memcpy(destination, &nnzCount, sizeof(nnzCount)); // create two pointers to the memory blocks inside the buffer, // one for data portion and anther -- for indices. auto* dataDst = destination + sizeof(nnzCount); auto* indicesDst = dataDst + elementSize* nnzCount; // column index for the current sample (= number of nnz value packed so far). IndexType columnOffset = 0; // a vector to store column index for each sample in the resulting (packed) matrix. vector sparseColumnIndices; // a vector to keep track of the offsets into each input sequence, // there an offset is the number of nnz values packed so far. Current sample // values/indices start of the offset position in the sequence data/index array vector sequenceOffsets(batch.size(), 0); vector sequenceInfos(pMBLayout->GetAllSequences()); // sort the vector in ascending order of the parallel sequence index. sort(sequenceInfos.begin(), sequenceInfos.end(), [](const MBLayout::SequenceInfo& a, const MBLayout::SequenceInfo& b){ return a.s < b.s; }); // Iterate over the all time steps in the layout (total number of samples/columns // in a parallel sequence), traversing the layout in horizontal direction. for (auto timeStep = 0; timeStep < pMBLayout->GetNumTimeSteps(); ++timeStep) { // For each time step, iterate over all sequences in the minibatch, // traversing the layout in vertical direction. for (const auto& sequenceInfo : sequenceInfos) { // skip the sequence if it does not intersect with the time step if (timeStep < sequenceInfo.tBegin || timeStep >= sequenceInfo.tEnd) { continue; } // store the offset of the current column )... sparseColumnIndices.push_back(columnOffset); auto seqId = sequenceInfo.seqId; if (seqId == GAP_SEQUENCE_ID) { continue; } // compute the index of the sample inside the sequence. size_t sampleIndex = timeStep - sequenceInfo.tBegin; const auto& sequence = batch[seqId]; // make sure the index less than the sequence length in samples. assert(sampleIndex < sequence->m_numberOfSamples); auto& sequenceOffset = sequenceOffsets[seqId]; SparseSequenceDataPtr sparseSequence = static_pointer_cast(sequence); IndexType nnz = sparseSequence->m_nnzCounts[sampleIndex]; // compute the sample offset in bytes. size_t sampleOffset = sequenceOffset * elementSize; // copy all nzz values from source sequence into the buffer. const auto* dataSrc = reinterpret_cast(sequence->GetDataBuffer()) + sampleOffset; memcpy(dataDst, dataSrc, nnz * elementSize); dataDst += nnz * elementSize; // advance the destination pointer // copy all nzz value indices from source sequence into the buffer. const auto* indicesSrc = sparseSequence->m_indices + sequenceOffset; memcpy(indicesDst, indicesSrc, nnz * indexSize); indicesDst += nnz * indexSize; // advance the destination pointer sequenceOffset += nnz; columnOffset += nnz; } } // at this point each element in sequenceOffsets should be equal to the total // nnz count of the respective sequence and the sum of all elements - to the // overall nnz count. assert(accumulate(sequenceOffsets.begin(), sequenceOffsets.end(), 0) == nnzCount); // check the distance between data and index destination pointers. assert(indicesDst == dataDst + nnzCount * indexSize); // after we packed all samples, the column offset must be equal to the total nnz count. assert(columnOffset == nnzCount); sparseColumnIndices.push_back(columnOffset); // check that the number of column indices == N + 1 (where N is the number of // column in the packed matrix) assert((pMBLayout->GetNumCols() + 1) == sparseColumnIndices.size()); // verify that there's enough space in the buffer for the array of column indices. assert(indicesDst + sparseColumnIndices.size()*indexSize <= destination + requiredSize); // copy column indices into the buffer. memcpy(indicesDst, sparseColumnIndices.data(), sparseColumnIndices.size() * indexSize); return pMBLayout; } }}}