swh:1:snp:f50ab94432af916b5fb8b4ad831e8dddded77084
Raw File
Tip revision: 491514602c4bf7fb9478b4e5cd7dd0d579ae03aa authored by Leonardo Nunes on 14 March 2018, 19:05:13 UTC
Removing hard tabs.
Tip revision: 4915146
SequenceRandomizer.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

#define _CRT_SECURE_NO_WARNINGS

#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "SequenceRandomizer.h"
#include <algorithm>
#include <utility>
#include <deque>
#include "RandomOrdering.h"

namespace CNTK {

    SequenceRandomizer::SequenceRandomizer(
        int verbosity,
        DataDeserializerPtr deserializer,
        ChunkRandomizerPtr chunkRandomizer)
        : m_verbosity(verbosity),
        m_randomizedChunks(chunkRandomizer->GetRandomizedChunks()),
        m_chunkWindowBegin(0),
        m_randomizedWindowEnd(0),
        m_randomizationCursor(0),
        m_chunkWindowEnd(0),
        m_currentSequenceCursor(0),
        m_currentChunkCursor(0),
        m_currentSampleCursor(0),
        m_deserializer(deserializer)
    {
        size_t max = 0;
        for (const auto& c : m_randomizedChunks)
        {
            if (max < c.m_original->m_numberOfSequences)
            {
                max = c.m_original->m_numberOfSequences;
            }
        }

        m_bufferOriginalSequences.reserve(max);
    }

    // Resets the current sweep according to the randomization seed provided.
    void SequenceRandomizer::Reset(size_t randSeed)
    {
        m_rng.seed((unsigned long)randSeed);

        m_sequenceWindow.clear();
        m_randomizedChunkInfo.clear();

        m_chunkWindowBegin = 0;
        m_randomizedWindowEnd = 0;
        m_randomizationCursor = 0;
        m_chunkWindowEnd = 0;

        m_currentChunkCursor = 0;
        m_currentSequenceCursor = 0;
        m_currentSampleCursor = 0;

        // Prepare the chunk for reading
        RandomizeNextChunkIfNeeded();
    }

    // Repeatedly invokes provided callback passing to it sequence descriptors from the randomized
    // timeline until the callback returns false. On each successful invocation, it advances the
    // current position (cursor) in the timeline.
    void SequenceRandomizer::GetNextSequenceDescriptions(
        const std::function<bool(const RandomizedSequenceDescription&)>& callback,
        ClosedOpenChunkInterval& requiredChunks)
    {
        // Initialize the range to the current chunk.
        requiredChunks.m_begin = (ChunkIdType)std::min(m_currentChunkCursor, m_randomizedChunks.size() - 1);
        requiredChunks.m_end = requiredChunks.m_begin + 1;

        while (m_currentChunkCursor < m_randomizedChunks.size())
        {
            size_t sequenceOffsetInsideChunk = m_currentSequenceCursor - m_randomizedChunks[m_currentChunkCursor].m_sequencePositionStart;
            RandomizedSequenceDescription& sequence = m_sequenceWindow[m_currentChunkCursor - m_chunkWindowBegin][sequenceOffsetInsideChunk];

            if (!callback(sequence))
                break;

            // Update the required chunk window.
            requiredChunks.m_begin = std::min(m_randomizedChunks[m_currentChunkCursor].m_randomizationWindow.m_begin, requiredChunks.m_begin);
            requiredChunks.m_end = std::max(m_randomizedChunks[m_currentChunkCursor].m_randomizationWindow.m_end, requiredChunks.m_end);

            // Update current cursor to the next sequence.
            m_currentSequenceCursor++;
            m_currentSampleCursor += sequence.m_numberOfSamples;
            if (sequenceOffsetInsideChunk + 1 >= m_randomizedChunks[m_currentChunkCursor].m_original->m_numberOfSequences)
            {
                // Moving to the next chunk,
                // Be careful, this invalidates the sequence from above.
                MoveChunkCursor();
            }
        }
    }

    // Move the chunk cursor to the next chunk, randomizing more sequences if necessary.
    void SequenceRandomizer::MoveChunkCursor()
    {
        m_currentChunkCursor++;
        RandomizeNextChunkIfNeeded();

        // Release chunks that are not needed anymore.
        ReleaseChunks();
    }

    // Release chunks from the chunk window that are not needed anymore.
    void SequenceRandomizer::ReleaseChunks()
    {
        // We should drop chunks, but firstly make sure that they are not used any more.
        // That means the sequence description that we have got from the previous call can still be in the BlockRandomizer.
        size_t currentChunk = std::min(m_currentChunkCursor, m_randomizedChunks.size() - 1);
        size_t candidateToUnload = m_chunkWindowBegin;
        size_t releasedChunks = 0;
        while (candidateToUnload < m_randomizedChunks.size() &&
               candidateToUnload < m_randomizedChunks[currentChunk].m_randomizationWindow.m_begin &&
               m_randomizedChunks[candidateToUnload].m_randomizationWindow.m_end <= m_currentChunkCursor)
        {
            m_sequenceWindow.pop_front();
            m_randomizedChunkInfo.pop_front();
            m_chunkWindowBegin++;
            candidateToUnload++;
            releasedChunks++;
        }

        if (m_verbosity && 0 < releasedChunks)
            fprintf(stderr,
                "SequenceRandomizer::ReleaseChunks(): "
                "released %" PRIu64 " chunks, now "
                "chunk window [%" PRIu64 "..%u), cursor %" PRIu64 ", "
                "randomized window [%" PRIu64 "..%" PRIu64 "), randomization cursor %" PRIu64 "\n",
                releasedChunks,
                m_chunkWindowBegin, m_chunkWindowEnd,
                m_currentChunkCursor,
                m_chunkWindowBegin, m_randomizedWindowEnd,
                m_randomizationCursor);
    }

    // Randomize one more chunk if needed after the chunk cursor has been incremented.
    void SequenceRandomizer::RandomizeNextChunkIfNeeded()
    {
        if (m_currentChunkCursor < m_randomizedWindowEnd)
        {
            assert(m_currentChunkCursor >= m_chunkWindowBegin);
            return;
        }
        assert(m_randomizedWindowEnd == m_currentChunkCursor);

        if (m_randomizedWindowEnd == m_randomizedChunks.size())
        {
            return;
        }

        // Chunk not yet randomized.
        // of the sample position we have to randomized (current + sampleCount).
        // We will randomize up to this chunk as the final position of windows end is guaranteed to have been determined
        // when all sequences up to that chunk have been randomized
        size_t nextRandomizationCursor = m_randomizedChunks[m_randomizedWindowEnd].m_randomizationWindow.m_end;
        while (nextRandomizationCursor < m_randomizedChunks.size() &&
               m_randomizedChunks[nextRandomizationCursor].m_randomizationWindow.m_begin <= m_randomizedWindowEnd)
        {
            nextRandomizationCursor++;
        }

        // Determine the end chunk that we need to load into memory.
        ChunkIdType nextChunkWindowEnd = m_randomizedChunks[nextRandomizationCursor - 1].m_randomizationWindow.m_end;

        // Lets page in everything from m_currentRangeEndChunkIndex to endChunkIdx
        for (ChunkIdType i = m_chunkWindowEnd; i < nextChunkWindowEnd; ++i)
        {
            AddRandomizedSequencesForChunk(i);
        }

        size_t firstSequencePositionToRandomize =
            m_randomizationCursor == 0 ? 0 : m_randomizedChunks[m_randomizationCursor - 1].SequenceEndPosition();

        size_t endSequencePosToRandomize = m_randomizedChunks[nextRandomizationCursor - 1].SequenceEndPosition();
        for (size_t t = firstSequencePositionToRandomize; t < endSequencePosToRandomize; ++t)
        {
            // Get valid randomization range, expressed in chunks
            // TODO: This can be done more efficiently, we know the range of chunks already.
            const ChunkIdType currentChunkIdx = GetChunkIndexForSequencePosition(t);

            size_t chunkWindowBegin = m_randomizedChunks[currentChunkIdx].m_randomizationWindow.m_begin;
            size_t chunkWindowEnd = m_randomizedChunks[currentChunkIdx].m_randomizationWindow.m_end;

            // Get valid randomization range, expressed in sequence positions.
            size_t posBegin = m_randomizedChunks[chunkWindowBegin].m_sequencePositionStart;
            size_t posEnd = m_randomizedChunks[chunkWindowEnd - 1].SequenceEndPosition();

            ChunkIdType tChunkIndex = GetChunkIndexForSequencePosition(t);
            auto& tSequence = GetRandomizedSequenceDescriptionByPosition(tChunkIndex, t);

            for (;;)
            {
                // Pick a sequence position from [posBegin, posEnd)
                const size_t j = Microsoft::MSR::CNTK::RandMT(posBegin, posEnd, m_rng);

                // Pick up j sequence.
                ChunkIdType jChunkIndex = GetChunkIndexForSequencePosition(j);
                auto& jSequence = GetRandomizedSequenceDescriptionByPosition(jChunkIndex, j);

                // Try again if the sequence currently at j cannot be placed at position i.
                if (!IsValidForPosition(tChunkIndex, jSequence))
                    continue;

                // Try again if the sequence currently at i cannot be placed at position j.
                if (!IsValidForPosition(jChunkIndex, tSequence))
                    continue;

                // Swap and break out.
                std::swap(tSequence, jSequence);
                break;
            }
        }

        // Verify that we got it right
        for (size_t t = firstSequencePositionToRandomize; t < endSequencePosToRandomize; ++t)
        {
            // TODO assert only
            ChunkIdType tChunkIndex = GetChunkIndexForSequencePosition(t);
            if (!IsValidForPosition(tChunkIndex, GetRandomizedSequenceDescriptionByPosition(tChunkIndex, t)))
            {
                LogicError("SequenceRandomizer::RandomizeNextSequenceDescriptions: randomization logic mangled!");
            }
        }

        // Let's recalculate number of samples in the randomized chunks for efficient indexing in seek.
        size_t sampleCount = 0;
        size_t randomizedChunk = m_randomizedWindowEnd - m_chunkWindowBegin;
        for (size_t index = 0; index < m_sequenceWindow[randomizedChunk].size(); index++)
        {
            sampleCount += m_sequenceWindow[randomizedChunk][index].m_numberOfSamples;
        }

        // Save the sample information.
        ChunkInfo info;
        info.numberOfSamples = sampleCount;
        info.start = m_randomizedChunkInfo.empty() ? 0 : m_randomizedChunkInfo.back().start + m_randomizedChunkInfo.back().numberOfSamples;
        m_randomizedChunkInfo.push_back(info);

        // Update the cursors.
        m_randomizedWindowEnd++;
        m_randomizationCursor = nextRandomizationCursor;
        m_chunkWindowEnd = nextChunkWindowEnd;

        if (m_verbosity)
            fprintf(stderr,
                "SequenceRandomizer::RandomizeNextChunkIfNeeded(): "
                "chunk window [%" PRIu64 "..%u), cursor %" PRIu64 ", "
                "randomized window [%" PRIu64 "..%" PRIu64 "), randomization cursor %" PRIu64 "\n",
                m_chunkWindowBegin, m_chunkWindowEnd,
                m_currentChunkCursor,
                m_chunkWindowBegin, m_randomizedWindowEnd,
                m_randomizationCursor);
    }

    // Sets current cursor to the given sample offset.
    // If offset is in the middle of the sequence, the next sequence is picked up.
    // If there is no sequence, an offset outside the sweep is returned.
    size_t SequenceRandomizer::Seek(size_t sweepSampleOffset, size_t sweep)
    {
        // Determine sample range that is randomized within the chunk window.
        size_t randomizeWindowBeginInSamples = 0;
        size_t randomizedWindowEndInSamples = 0;
        if (!m_randomizedChunkInfo.empty())
        {
            randomizeWindowBeginInSamples = m_randomizedChunkInfo.front().start;
            randomizedWindowEndInSamples = m_randomizedChunkInfo.back().start + m_randomizedChunkInfo.back().numberOfSamples;
        }

        if (m_verbosity)
            fprintf(stderr, "SequenceRandomizer::Seek(): seeking offset %" PRIu64 " in sweep %" PRIu64 "\n",
                sweepSampleOffset,
                sweep);

        if (sweepSampleOffset < randomizeWindowBeginInSamples)
        {
            // The requested offset is before the earliest randomized sequences we still have.
            // Need to start over.
            if (m_verbosity)
                fprintf(stderr, "SequenceRandomizer::Seek(): starting over \n");

            Reset(sweep);
        }
        else if (sweepSampleOffset < randomizedWindowEndInSamples)
        {
            // The requested offset is within the randomized window.
            // We change the current chunk cursor to contain the requested offset.
            if (m_verbosity)
                fprintf(stderr, "SequenceRandomizer::Seek(): offset is within randomized window\n");
            size_t index;
            for (index = 0; index < m_randomizedChunkInfo.size(); index++)
            {
                if (m_randomizedChunkInfo[index].start <= sweepSampleOffset &&
                    sweepSampleOffset < (m_randomizedChunkInfo[index].start + m_randomizedChunkInfo[index].numberOfSamples))
                {
                    break;
                }
            }
            assert(index != m_randomizedChunkInfo.size());

            m_currentChunkCursor = m_chunkWindowBegin + index;
            m_currentSequenceCursor = m_randomizedChunks[m_currentChunkCursor].m_sequencePositionStart;
            m_currentSampleCursor = m_randomizedChunkInfo[index].start;

            // TODO most of the time, we can advance to the right sequence here
            // (unless we need to go past the randomized chunk window)
        }

        // Advance sequence by sequence until the desire offset is reached.
        if (m_verbosity)
            fprintf(stderr, "SequenceRandomizer::Seek(): advancing cursor from %" PRIu64 " to %" PRIu64 "\n",
                m_currentSampleCursor,
                sweepSampleOffset);

        ClosedOpenChunkInterval window;
        GetNextSequenceDescriptions([&](const RandomizedSequenceDescription&) { return m_currentSampleCursor < sweepSampleOffset; }, window);

        return m_currentSampleCursor;
    }

    // Checks if the randomized sequence is valid for a target chunk.
    bool SequenceRandomizer::IsValidForPosition(ChunkIdType chunkIndex, const RandomizedSequenceDescription& seqDesc) const
    {
        const auto& chunk = m_randomizedChunks[chunkIndex];
        return chunk.m_randomizationWindow.m_begin <= seqDesc.m_chunk->m_chunkId && seqDesc.m_chunk->m_chunkId < chunk.m_randomizationWindow.m_end;
    }

    // Gets randomized chunk index using a sequence position in the sweep.
    ChunkIdType SequenceRandomizer::GetChunkIndexForSequencePosition(size_t sequencePosition) const
    {
        auto result = std::upper_bound(
            m_randomizedChunks.begin(),
            m_randomizedChunks.end(),
            sequencePosition,
            [](size_t sp, const RandomizedChunk& c) { return sp < c.m_sequencePositionStart; });
        return (ChunkIdType)(result - 1 - m_randomizedChunks.begin());
    }

    // Add randomizes sequences for the chunk with a given index.
    void SequenceRandomizer::AddRandomizedSequencesForChunk(ChunkIdType chunkIdx)
    {
        assert(chunkIdx == m_chunkWindowEnd);

        const RandomizedChunk& chunk = m_randomizedChunks[chunkIdx];
        std::vector<RandomizedSequenceDescription> chunkSequences;

        m_bufferOriginalSequences.clear();
        m_deserializer->SequenceInfosForChunk(chunk.m_original->m_id, m_bufferOriginalSequences);
        chunkSequences.reserve(m_bufferOriginalSequences.size());
        for (size_t k = 0; k < m_bufferOriginalSequences.size(); k++)
        {
            RandomizedSequenceDescription s;
            s.m_indexInOriginalChunk = m_bufferOriginalSequences[k].m_indexInChunk;
            s.m_numberOfSamples = m_bufferOriginalSequences[k].m_numberOfSamples;
            s.m_chunk = &chunk;
            chunkSequences.push_back(s);
        }

        m_sequenceWindow.push_back(std::move(chunkSequences));
        m_chunkWindowEnd++;
    }

    // Gets randomized sequence by the sequence position in the sweep and randomized chunk index.
    RandomizedSequenceDescription& SequenceRandomizer::GetRandomizedSequenceDescriptionByPosition(ChunkIdType chunkIndex, size_t sequenceSweepPosition)
    {
        size_t sequenceOffsetInsideChunk = sequenceSweepPosition - m_randomizedChunks[chunkIndex].m_sequencePositionStart;
        return m_sequenceWindow[chunkIndex - m_chunkWindowBegin][sequenceOffsetInsideChunk];
    }
}
back to top