// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE.md file in the project root for full license information. // #define _CRT_SECURE_NO_WARNINGS #define __STDC_FORMAT_MACROS #include #include "SequenceRandomizer.h" #include #include #include #include "RandomOrdering.h" namespace Microsoft { namespace MSR { namespace CNTK { SequenceRandomizer::SequenceRandomizer( int verbosity, IDataDeserializerPtr deserializer, ChunkRandomizerPtr chunkRandomizer) : m_verbosity(verbosity), m_randomizedChunks(chunkRandomizer->GetRandomizedChunks()), m_chunkWindowBegin(0), m_randomizedWindowEnd(0), m_randomizationCursor(0), m_chunkWindowEnd(0), m_currentSequenceCursor(0), m_currentChunkCursor(0), m_currentSampleCursor(0), m_deserializer(deserializer) { size_t max = 0; for (const auto& c : m_randomizedChunks) { if (max < c.m_original->m_numberOfSequences) { max = c.m_original->m_numberOfSequences; } } m_bufferOriginalSequences.reserve(max); } // Resets the current sweep according to the randomization seed provided. void SequenceRandomizer::Reset(size_t randSeed) { m_rng.seed((unsigned long)randSeed); m_sequenceWindow.clear(); m_randomizedChunkInfo.clear(); m_chunkWindowBegin = 0; m_randomizedWindowEnd = 0; m_randomizationCursor = 0; m_chunkWindowEnd = 0; m_currentChunkCursor = 0; m_currentSequenceCursor = 0; m_currentSampleCursor = 0; // Prepare the chunk for reading RandomizeNextChunkIfNeeded(); } // Gets next randomized sequence descriptions not exceeding the sample count. std::vector SequenceRandomizer::GetNextSequenceDescriptions(size_t sampleCount, ClosedOpenChunkInterval& requiredChunks) { int samples = (int)sampleCount; // Initialize the range to the current chunk. requiredChunks.m_begin = (ChunkIdType)std::min(m_currentChunkCursor, m_randomizedChunks.size() - 1); requiredChunks.m_end = requiredChunks.m_begin + 1; std::vector result; result.reserve(sampleCount); bool firstSequence = true; while (samples > 0 && m_currentChunkCursor < m_randomizedChunks.size()) { size_t sequenceOffsetInsideChunk = m_currentSequenceCursor - m_randomizedChunks[m_currentChunkCursor].m_sequencePositionStart; const RandomizedSequenceDescription* sequence = &m_sequenceWindow[m_currentChunkCursor - m_chunkWindowBegin][sequenceOffsetInsideChunk]; int sequenceLength = (int)sequence->m_numberOfSamples; if (firstSequence || samples >= sequenceLength) { requiredChunks.m_begin = std::min(m_randomizedChunks[m_currentChunkCursor].m_randomizationWindow.m_begin, requiredChunks.m_begin); requiredChunks.m_end = std::max(m_randomizedChunks[m_currentChunkCursor].m_randomizationWindow.m_end, requiredChunks.m_end); firstSequence = false; result.push_back(*sequence); m_currentSequenceCursor++; m_currentSampleCursor += sequenceLength; if (sequenceOffsetInsideChunk + 1 >= m_randomizedChunks[m_currentChunkCursor].m_original->m_numberOfSequences) { // Moving to the next chunk, // Be careful, this invalidates the sequence from above. MoveChunkCursor(); } } // Always decrease the available number of samples. samples -= sequenceLength; } return result; } // Move the chunk cursor to the next chunk, randomizing more sequences if necessary. void SequenceRandomizer::MoveChunkCursor() { m_currentChunkCursor++; RandomizeNextChunkIfNeeded(); // Release chunks that are not needed anymore. ReleaseChunks(); } // Release chunks from the chunk window that are not needed anymore. void SequenceRandomizer::ReleaseChunks() { // We should drop chunks, but firstly make sure that they are not used any more. // That means the sequence description that we have got from the previous call can still be in the BlockRandomizer. size_t currentChunk = std::min(m_currentChunkCursor, m_randomizedChunks.size() - 1); size_t candidateToUnload = m_chunkWindowBegin; size_t releasedChunks = 0; while (candidateToUnload < m_randomizedChunks.size() && candidateToUnload < m_randomizedChunks[currentChunk].m_randomizationWindow.m_begin && m_randomizedChunks[candidateToUnload].m_randomizationWindow.m_end <= m_currentChunkCursor) { m_sequenceWindow.pop_front(); m_randomizedChunkInfo.pop_front(); m_chunkWindowBegin++; candidateToUnload++; releasedChunks++; } if (m_verbosity && 0 < releasedChunks) fprintf(stderr, "SequenceRandomizer::ReleaseChunks(): " "released %" PRIu64 " chunks, now " "chunk window [%" PRIu64 "..%u), cursor %" PRIu64 ", " "randomized window [%" PRIu64 "..%" PRIu64 "), randomization cursor %" PRIu64 "\n", releasedChunks, m_chunkWindowBegin, m_chunkWindowEnd, m_currentChunkCursor, m_chunkWindowBegin, m_randomizedWindowEnd, m_randomizationCursor); } // Randomize one more chunk if needed after the chunk cursor has been incremented. void SequenceRandomizer::RandomizeNextChunkIfNeeded() { if (m_currentChunkCursor < m_randomizedWindowEnd) { assert(m_currentChunkCursor >= m_chunkWindowBegin); return; } assert(m_randomizedWindowEnd == m_currentChunkCursor); if (m_randomizedWindowEnd == m_randomizedChunks.size()) { return; } // Chunk not yet randomized. // of the sample position we have to randomized (current + sampleCount). // We will randomize up to this chunk as the final position of windows end is guaranteed to have been determined // when all sequences up to that chunk have been randomized size_t nextRandomizationCursor = m_randomizedChunks[m_randomizedWindowEnd].m_randomizationWindow.m_end; while (nextRandomizationCursor < m_randomizedChunks.size() && m_randomizedChunks[nextRandomizationCursor].m_randomizationWindow.m_begin <= m_randomizedWindowEnd) { nextRandomizationCursor++; } // Determine the end chunk that we need to load into memory. ChunkIdType nextChunkWindowEnd = m_randomizedChunks[nextRandomizationCursor - 1].m_randomizationWindow.m_end; // Lets page in everything from m_currentRangeEndChunkIndex to endChunkIdx for (ChunkIdType i = m_chunkWindowEnd; i < nextChunkWindowEnd; ++i) { AddRandomizedSequencesForChunk(i); } size_t firstSequencePositionToRandomize = m_randomizationCursor == 0 ? 0 : m_randomizedChunks[m_randomizationCursor - 1].SequenceEndPosition(); size_t endSequencePosToRandomize = m_randomizedChunks[nextRandomizationCursor - 1].SequenceEndPosition(); for (size_t t = firstSequencePositionToRandomize; t < endSequencePosToRandomize; ++t) { // Get valid randomization range, expressed in chunks // TODO: This can be done more efficiently, we know the range of chunks already. const ChunkIdType currentChunkIdx = GetChunkIndexForSequencePosition(t); size_t chunkWindowBegin = m_randomizedChunks[currentChunkIdx].m_randomizationWindow.m_begin; size_t chunkWindowEnd = m_randomizedChunks[currentChunkIdx].m_randomizationWindow.m_end; // Get valid randomization range, expressed in sequence positions. size_t posBegin = m_randomizedChunks[chunkWindowBegin].m_sequencePositionStart; size_t posEnd = m_randomizedChunks[chunkWindowEnd - 1].SequenceEndPosition(); ChunkIdType tChunkIndex = GetChunkIndexForSequencePosition(t); auto& tSequence = GetRandomizedSequenceDescriptionByPosition(tChunkIndex, t); for (;;) { // Pick a sequence position from [posBegin, posEnd) const size_t j = RandMT(posBegin, posEnd, m_rng); // Pick up j sequence. ChunkIdType jChunkIndex = GetChunkIndexForSequencePosition(j); auto& jSequence = GetRandomizedSequenceDescriptionByPosition(jChunkIndex, j); // Try again if the sequence currently at j cannot be placed at position i. if (!IsValidForPosition(tChunkIndex, jSequence)) continue; // Try again if the sequence currently at i cannot be placed at position j. if (!IsValidForPosition(jChunkIndex, tSequence)) continue; // Swap and break out. std::swap(tSequence, jSequence); break; } } // Verify that we got it right for (size_t t = firstSequencePositionToRandomize; t < endSequencePosToRandomize; ++t) { // TODO assert only ChunkIdType tChunkIndex = GetChunkIndexForSequencePosition(t); if (!IsValidForPosition(tChunkIndex, GetRandomizedSequenceDescriptionByPosition(tChunkIndex, t))) { LogicError("SequenceRandomizer::RandomizeNextSequenceDescriptions: randomization logic mangled!"); } } // Let's recalculate number of samples in the randomized chunks for efficient indexing in seek. size_t sampleCount = 0; size_t randomizedChunk = m_randomizedWindowEnd - m_chunkWindowBegin; for (size_t index = 0; index < m_sequenceWindow[randomizedChunk].size(); index++) { sampleCount += m_sequenceWindow[randomizedChunk][index].m_numberOfSamples; } // Save the sample information. ChunkInfo info; info.numberOfSamples = sampleCount; info.start = m_randomizedChunkInfo.empty() ? 0 : m_randomizedChunkInfo.back().start + m_randomizedChunkInfo.back().numberOfSamples; m_randomizedChunkInfo.push_back(info); // Update the cursors. m_randomizedWindowEnd++; m_randomizationCursor = nextRandomizationCursor; m_chunkWindowEnd = nextChunkWindowEnd; if (m_verbosity) fprintf(stderr, "SequenceRandomizer::RandomizeNextChunkIfNeeded(): " "chunk window [%" PRIu64 "..%u), cursor %" PRIu64 ", " "randomized window [%" PRIu64 "..%" PRIu64 "), randomization cursor %" PRIu64 "\n", m_chunkWindowBegin, m_chunkWindowEnd, m_currentChunkCursor, m_chunkWindowBegin, m_randomizedWindowEnd, m_randomizationCursor); } // Sets current cursor to the given sample offset. // If offset is in the middle of the sequence, the next sequence is picked up. // If there is no sequence, an offset outside the sweep is returned. size_t SequenceRandomizer::Seek(size_t sweepSampleOffset, size_t sweep) { // Determine sample range that is randomized within the chunk window. size_t randomizeWindowBeginInSamples = 0; size_t randomizedWindowEndInSamples = 0; if (!m_randomizedChunkInfo.empty()) { randomizeWindowBeginInSamples = m_randomizedChunkInfo.front().start; randomizedWindowEndInSamples = m_randomizedChunkInfo.back().start + m_randomizedChunkInfo.back().numberOfSamples; } if (m_verbosity) fprintf(stderr, "SequenceRandomizer::Seek(): seeking offset %" PRIu64 " in sweep %" PRIu64 "\n", sweepSampleOffset, sweep); if (sweepSampleOffset < randomizeWindowBeginInSamples) { // The requested offset is before the earliest randomized sequences we still have. // Need to start over. if (m_verbosity) fprintf(stderr, "SequenceRandomizer::Seek(): starting over \n"); Reset(sweep); } else if (sweepSampleOffset < randomizedWindowEndInSamples) { // The requested offset is within the randomized window. // We change the current chunk cursor to contain the requested offset. if (m_verbosity) fprintf(stderr, "SequenceRandomizer::Seek(): offset is within randomized window\n"); size_t index; for (index = 0; index < m_randomizedChunkInfo.size(); index++) { if (m_randomizedChunkInfo[index].start <= sweepSampleOffset && sweepSampleOffset < (m_randomizedChunkInfo[index].start + m_randomizedChunkInfo[index].numberOfSamples)) { break; } } assert(index != m_randomizedChunkInfo.size()); m_currentChunkCursor = m_chunkWindowBegin + index; m_currentSequenceCursor = m_randomizedChunks[m_currentChunkCursor].m_sequencePositionStart; m_currentSampleCursor = m_randomizedChunkInfo[index].start; // TODO most of the time, we can advance to the right sequence here // (unless we need to go past the randomized chunk window) } // Advance sequence by sequence until the desire offset is reached. if (m_verbosity) fprintf(stderr, "SequenceRandomizer::Seek(): advancing cursor from %" PRIu64 " to %" PRIu64 "\n", m_currentSampleCursor, sweepSampleOffset); // TODO perhaps optimize this ClosedOpenChunkInterval window; while (m_currentSampleCursor < sweepSampleOffset) { GetNextSequenceDescriptions(1, window); } return m_currentSampleCursor; } // Checks if the randomized sequence is valid for a target chunk. bool SequenceRandomizer::IsValidForPosition(ChunkIdType chunkIndex, const RandomizedSequenceDescription& seqDesc) const { const auto& chunk = m_randomizedChunks[chunkIndex]; return chunk.m_randomizationWindow.m_begin <= seqDesc.m_chunk->m_chunkId && seqDesc.m_chunk->m_chunkId < chunk.m_randomizationWindow.m_end; } // Gets randomized chunk index using a sequence position in the sweep. ChunkIdType SequenceRandomizer::GetChunkIndexForSequencePosition(size_t sequencePosition) const { auto result = std::upper_bound( m_randomizedChunks.begin(), m_randomizedChunks.end(), sequencePosition, [](size_t sp, const RandomizedChunk& c) { return sp < c.m_sequencePositionStart; }); return (ChunkIdType)(result - 1 - m_randomizedChunks.begin()); } // Add randomizes sequences for the chunk with a given index. void SequenceRandomizer::AddRandomizedSequencesForChunk(ChunkIdType chunkIdx) { assert(chunkIdx == m_chunkWindowEnd); const RandomizedChunk& chunk = m_randomizedChunks[chunkIdx]; std::vector chunkSequences; m_bufferOriginalSequences.clear(); m_deserializer->GetSequencesForChunk(chunk.m_original->m_id, m_bufferOriginalSequences); chunkSequences.reserve(m_bufferOriginalSequences.size()); for (size_t k = 0; k < m_bufferOriginalSequences.size(); k++) { RandomizedSequenceDescription s; s.m_id = m_bufferOriginalSequences[k].m_id; s.m_numberOfSamples = m_bufferOriginalSequences[k].m_numberOfSamples; s.m_chunk = &chunk; chunkSequences.push_back(s); } m_sequenceWindow.push_back(std::move(chunkSequences)); m_chunkWindowEnd++; } // Gets randomized sequence by the sequence position in the sweep and randomized chunk index. RandomizedSequenceDescription& SequenceRandomizer::GetRandomizedSequenceDescriptionByPosition(ChunkIdType chunkIndex, size_t sequenceSweepPosition) { size_t sequenceOffsetInsideChunk = sequenceSweepPosition - m_randomizedChunks[chunkIndex].m_sequencePositionStart; return m_sequenceWindow[chunkIndex - m_chunkWindowBegin][sequenceOffsetInsideChunk]; } }}}