swh:1:snp:f50ab94432af916b5fb8b4ad831e8dddded77084
Tip revision: d67669c298a7bdc3668437b3d0c332e454fce414 authored by Amit Agarwal on 01 April 2016, 22:16:52 UTC
Library
Library
Tip revision: d67669c
BlockRandomizer.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#define _CRT_SECURE_NO_WARNINGS
#include "BlockRandomizer.h"
#include <algorithm>
#include <utility>
#include <iostream>
#include "DataReader.h"
#include <random>
namespace Microsoft { namespace MSR { namespace CNTK {
// TODO: This is an old code, used for legacy randomization to make sure to preserve the same behavior for the tests.
static inline size_t rand(const size_t begin, const size_t end)
{
// still only covers 32-bit range
const size_t randomNumber = ::rand() * RAND_MAX + ::rand();
return begin + randomNumber % (end - begin);
}
// TODO: This is an old code, used for legacy randomization to make sure to preserve the same behavior for the tests.
// TODO: Will be removed after more testing of the new functionality is done, currently the set of tests is limited.
// Shuffle a vector into random order by randomly swapping elements.
template <typename TVector>
void RandomShuffle(TVector& v, size_t randomSeed)
{
if (v.size() > RAND_MAX * static_cast<size_t>(RAND_MAX))
{
RuntimeError("RandomShuffle: too large set: need to change to different random generator!");
}
srand((unsigned int)randomSeed);
foreach_index (currentLocation, v)
{
// Pick a random location a location and swap with current
const size_t randomLocation = rand(0, v.size());
std::swap(v[currentLocation], v[randomLocation]);
}
}
bool BlockRandomizer::TimelineIsValidForRandomization(const SequenceDescriptions& timeline) const
{
SequenceDescription previous = { SIZE_MAX, 0, 0, true };
auto it = std::find_if_not(timeline.begin(), timeline.end(),
[&](const SequenceDescription* current)
{
bool result = current->m_isValid
&& previous.m_id + 1 == current->m_id
&& previous.m_chunkId <= current->m_chunkId
&& current->m_chunkId <= previous.m_chunkId + 1
&& 0 < current->m_numberOfSamples;
previous = *current;
return result;
});
return it == timeline.end();
}
void BlockRandomizer::RandomizeChunks()
{
// Create vector of chunk indices and shuffle them using current sweep as seed
std::vector<size_t> randomizedChunkIndices;
randomizedChunkIndices.reserve(m_numChunks);
for (size_t i = 0; i < m_numChunks; i++)
{
randomizedChunkIndices.push_back(i);
}
if (m_useLegacyRandomization)
{
RandomShuffle(randomizedChunkIndices, m_sweep);
}
else
{
std::mt19937 m_rng((int)m_sweep);
std::shuffle(randomizedChunkIndices.begin(), randomizedChunkIndices.end(), m_rng);
}
// Place randomized chunks on global time line
m_randomizedChunks.clear();
m_randomizedChunks.reserve(m_numChunks + 1);
size_t chunkId, samplePosition, sequencePosition;
for (chunkId = 0, samplePosition = m_sweepStartInSamples, sequencePosition = 0; chunkId < m_numChunks; chunkId++)
{
const size_t originalChunkIndex = randomizedChunkIndices[chunkId];
const size_t numSequences =
m_chunkInformation[originalChunkIndex + 1].m_sequencePositionStart -
m_chunkInformation[originalChunkIndex].m_sequencePositionStart;
const size_t numSamples =
m_chunkInformation[originalChunkIndex + 1].m_samplePositionStart -
m_chunkInformation[originalChunkIndex].m_samplePositionStart;
m_randomizedChunks.push_back(RandomizedChunk{ sequencePosition, samplePosition, originalChunkIndex });
samplePosition += numSamples;
sequencePosition += numSequences;
}
// Add sentinel
m_randomizedChunks.push_back(RandomizedChunk{ sequencePosition, samplePosition, SIZE_MAX });
// For each chunk, compute the randomization range (w.r.t. the randomized chunk sequence)
size_t halfWindowRange = m_randomizationRangeInSamples / 2;
for (size_t chunkId = 0; chunkId < m_numChunks; chunkId++)
{
auto& chunk = m_randomizedChunks[chunkId];
// start with the range of left neighbor
if (chunkId == 0)
{
chunk.m_windowBegin = 0;
chunk.m_windowEnd = 1;
}
else
{
chunk.m_windowBegin = m_randomizedChunks[chunkId - 1].m_windowBegin; // might be too early
chunk.m_windowEnd = m_randomizedChunks[chunkId - 1].m_windowEnd; // might have more space
}
while (chunk.m_info.m_samplePositionStart - m_randomizedChunks[chunk.m_windowBegin].m_info.m_samplePositionStart > halfWindowRange)
chunk.m_windowBegin++; // too early
// TODO m_randomizedChunks[chunk.windowend + 1].info.samplePositionStart - m_randomizedChunks[chunk.windowbegin].info.samplePositionStart < m_randomizationRangeInSamples
chunk.m_windowEnd = std::max(chunk.m_windowEnd, chunk.m_windowBegin + 1);
while (chunk.m_windowEnd < m_numChunks &&
m_randomizedChunks[chunk.m_windowEnd + 1].m_info.m_samplePositionStart - chunk.m_info.m_samplePositionStart < halfWindowRange)
chunk.m_windowEnd++; // got more space
}
}
// TODO: Profile and eliminate PositionConverter, better convert sequencePosition to RandomizedChunk
// once.
size_t BlockRandomizer::GetChunkIndexForSequencePosition(size_t sequencePosition) const
{
assert(sequencePosition <= m_numSamples);
struct PositionConverter
{
size_t m_position;
PositionConverter(const RandomizedChunk & chunk) : m_position(chunk.m_info.m_sequencePositionStart) {};
PositionConverter(size_t sequencePosition) : m_position(sequencePosition) {};
};
auto result = std::lower_bound(m_randomizedChunks.begin(), m_randomizedChunks.end(), sequencePosition,
[](const PositionConverter& a, const PositionConverter& b)
{
return a.m_position <= b.m_position;
});
return result - m_randomizedChunks.begin() - 1;
}
bool BlockRandomizer::IsValidForPosition(size_t targetPosition, const SequenceDescription& seqDesc) const
{
const auto& chunk = m_randomizedChunks[GetChunkIndexForSequencePosition(targetPosition)];
return chunk.m_windowBegin <= seqDesc.m_chunkId && seqDesc.m_chunkId < chunk.m_windowEnd;
}
void BlockRandomizer::Randomize()
{
const auto& timeline = m_deserializer->GetSequenceDescriptions();
RandomizeChunks();
// Set up m_randomTimeline, shuffled by chunks.
m_randomTimeline.clear();
m_randomTimeline.reserve(m_numSequences);
for (size_t chunkId = 0; chunkId < m_numChunks; chunkId++)
{
auto originalChunkIndex = m_randomizedChunks[chunkId].m_originalChunkIndex;
for (size_t sequencePosition = m_chunkInformation[originalChunkIndex].m_sequencePositionStart;
sequencePosition < m_chunkInformation[originalChunkIndex + 1].m_sequencePositionStart;
sequencePosition++)
{
SequenceDescription randomizedSeqDesc = *timeline[sequencePosition];
randomizedSeqDesc.m_chunkId = chunkId;
m_randomTimeline.push_back(randomizedSeqDesc);
}
}
assert(m_randomTimeline.size() == m_numSequences);
// Check we got those setup right
foreach_index (i, m_randomTimeline)
{
assert(IsValidForPosition(i, m_randomTimeline[i]));
}
// Now randomly shuffle m_randomTimeline, while considering the
// constraints of what chunk range needs to be in memory.
srand((unsigned int)(m_sweep + 1));
foreach_index (i, m_randomTimeline)
{
// Get valid randomization range, expressed in chunks
const size_t chunkId = GetChunkIndexForSequencePosition(i);
const size_t windowBegin = m_randomizedChunks[chunkId].m_windowBegin;
const size_t windowEnd = m_randomizedChunks[chunkId].m_windowEnd;
// Get valid randomization range, expressed in sequence positions.
size_t posBegin = m_randomizedChunks[windowBegin].m_info.m_sequencePositionStart;
size_t posEnd = m_randomizedChunks[windowEnd].m_info.m_sequencePositionStart;
for (;;)
{
// Pick a sequence position from [posBegin, posEnd)
const size_t j = rand(posBegin, posEnd);
// Try again if the sequence currently at j cannot be placed at position i.
if (!IsValidForPosition(i, m_randomTimeline[j]))
continue;
// Try again if the sequence currently at i cannot be placed at position j.
if (!IsValidForPosition(j, m_randomTimeline[i]))
continue;
// Swap and break out.
std::swap(m_randomTimeline[i], m_randomTimeline[j]); // TODO old swap was perhaps more efficient
break;
}
}
// Verify that we got it right
foreach_index (i, m_randomTimeline)
{
// TODO assert only
if (!IsValidForPosition(i, m_randomTimeline[i]))
LogicError("BlockRandomizer::Randomize: randomization logic mangled!");
}
}
// Randomizes if new sweep of the data is needed.
// Returns true in case when randomization happend and false if the end of the current
// sweep has not yet been reached (no randomization took place).
bool BlockRandomizer::RandomizeIfNewSweepIsEntered()
{
// Check that StartEpoch() was called
assert(m_sequencePositionInSweep != SIZE_MAX);
if (m_sequencePositionInSweep >= m_numSequences)
{
if (m_verbosity > 0)
std::cerr << __FUNCTION__ << ": re-randomizing for sweep " << m_sweep
<< " in " << (m_frameMode ? "frame" : "utterance") << " mode" << endl;
m_sweep++;
m_sweepStartInSamples += m_numSamples;
Randomize();
m_sequencePositionInSweep -= m_numSequences;
assert(m_sequencePositionInSweep < m_numSequences); // cannot jump ahead more than a sweep
return true;
};
return false;
}
void BlockRandomizer::RandomizeForGlobalSamplePosition(const size_t samplePosition)
{
size_t sweep = samplePosition / m_numSamples;
if (m_sweep != sweep)
{
m_sweep = sweep;
m_sweepStartInSamples = sweep * m_numSamples;
Randomize();
}
m_sequencePositionInSweep = samplePosition % m_numSamples; // TODO only for m_frameMode
};
//
// Public methods
//
BlockRandomizer::BlockRandomizer(int verbosity,
size_t randomizationRangeInSamples,
IDataDeserializerPtr deserializer,
DistributionMode distributionMode,
bool useLegacyRandomization) :
m_verbosity(verbosity),
m_randomizationRangeInSamples(randomizationRangeInSamples),
m_deserializer(deserializer),
m_distributionMode(distributionMode),
m_useLegacyRandomization(useLegacyRandomization),
m_sweep(SIZE_MAX),
m_sequencePositionInSweep(SIZE_MAX),
m_samplePositionInEpoch(SIZE_MAX),
m_epochSize(SIZE_MAX)
{
assert(deserializer != nullptr);
const SequenceDescriptions& timeline = m_deserializer->GetSequenceDescriptions();
assert(TimelineIsValidForRandomization(timeline));
if (timeline.size() == 0)
{
m_numSequences = 0;
m_numChunks = 0;
}
else
{
// TODO let timeline keep this info?
m_numSequences = timeline.back()->m_id + 1;
m_numChunks = timeline.back()->m_chunkId + 1;
}
// Generate additional information about physical chunks
assert(m_chunkInformation.size() == 0);
m_chunkInformation.reserve(m_numChunks + 1);
m_chunkInformation.insert(m_chunkInformation.begin(),
m_numChunks + 1,
ChunkInformation{ SIZE_MAX, SIZE_MAX });
size_t maxNumberOfSamples = 0;
m_numSamples = 0;
for (const auto& seqDesc : timeline)
{
// TODO let timeline keep this info?
auto& chunkInformation = m_chunkInformation[seqDesc->m_chunkId];
chunkInformation.m_sequencePositionStart =
min(chunkInformation.m_sequencePositionStart, seqDesc->m_id);
chunkInformation.m_samplePositionStart =
min(chunkInformation.m_samplePositionStart, m_numSamples);
maxNumberOfSamples = max(maxNumberOfSamples, seqDesc->m_numberOfSamples);
m_numSamples += seqDesc->m_numberOfSamples;
}
// Add sentinel
m_chunkInformation[m_numChunks] = { m_numSequences, m_numSamples };
// Frame mode to the randomizer just means there are only single-sample sequences
m_frameMode = (maxNumberOfSamples == 1);
m_streams = m_deserializer->GetStreamDescriptions();
}
void BlockRandomizer::Initialize(TransformerPtr next, const ConfigParameters& readerConfig)
{
// Not used for the block randomizer.
UNUSED(next);
UNUSED(readerConfig);
}
void BlockRandomizer::StartEpoch(const EpochConfiguration& config)
{
m_workerRank = config.m_workerRank;
m_numberOfWorkers = config.m_numberOfWorkers;
// eldak: check partial minibatches.
if (config.m_totalEpochSizeInSamples == requestDataSize)
{
m_epochSize = m_numSamples;
}
else
{
m_epochSize = config.m_totalEpochSizeInSamples;
}
// TODO add some asserts on EpochConfiguration
m_samplePositionInEpoch = 0;
size_t timeframe = m_epochSize * config.m_epochIndex;
assert(m_frameMode); // TODO !m_frameMode needs fixes
assert(timeframe != SIZE_MAX); // used as special value for init
RandomizeForGlobalSamplePosition(timeframe);
};
bool BlockRandomizer::GetNextSequenceIds(size_t sampleCount, std::vector<size_t>& originalIds, std::unordered_set<size_t>& originalChunks)
{
assert(m_frameMode); // TODO !m_frameMode not implemented yet
assert(originalIds.size() == 0);
assert(originalChunks.size() == 0);
assert(sampleCount <= m_numSamples);
if (m_samplePositionInEpoch < m_epochSize)
{
if (m_distributionMode == DistributionMode::chunk_modulus)
{
size_t distributedSampleCount = 0;
while ((m_samplePositionInEpoch < m_epochSize) &&
(distributedSampleCount < sampleCount))
{
if (RandomizeIfNewSweepIsEntered() && 0 < distributedSampleCount)
{
// Minibatch ends on sweep boundary.
// TODO matches old behavior, consider changing; make configurable
break;
}
const auto& seqDesc = m_randomTimeline[m_sequencePositionInSweep];
if ((seqDesc.m_chunkId % m_numberOfWorkers) == m_workerRank)
{
// Got one, collect it (and its window of chunks)
originalIds.push_back(seqDesc.m_id);
const auto & currentChunk = m_randomizedChunks[GetChunkIndexForSequencePosition(seqDesc.m_id)];
const size_t windowBegin = currentChunk.m_windowBegin;
const size_t windowEnd = currentChunk.m_windowEnd;
for (size_t chunk = windowBegin; chunk < windowEnd; chunk++)
{
if ((chunk % m_numberOfWorkers) == m_workerRank)
{
originalChunks.insert(m_randomizedChunks[chunk].m_originalChunkIndex);
}
}
}
m_samplePositionInEpoch += seqDesc.m_numberOfSamples;
m_sequencePositionInSweep++;
distributedSampleCount++;
}
}
else
{
assert(m_distributionMode == DistributionMode::sequences_strides);
size_t nextSamplePositionInEpoch = std::min(m_epochSize, m_samplePositionInEpoch + sampleCount);
size_t distributedSampleCount = nextSamplePositionInEpoch - m_samplePositionInEpoch;
size_t strideBegin = distributedSampleCount * m_workerRank / m_numberOfWorkers;
size_t strideEnd = distributedSampleCount * (m_workerRank + 1) / m_numberOfWorkers;
for (size_t i = 0; i < distributedSampleCount; ++i, ++m_samplePositionInEpoch, ++m_sequencePositionInSweep)
{
RandomizeIfNewSweepIsEntered(); // TODO return value ignored here?
if (strideBegin <= i && i < strideEnd)
{
const auto& seqDesc = m_randomTimeline[m_sequencePositionInSweep];
originalIds.push_back(seqDesc.m_id);
const auto & currentChunk = m_randomizedChunks[GetChunkIndexForSequencePosition(m_sequencePositionInSweep)];
const size_t windowBegin = currentChunk.m_windowBegin;
const size_t windowEnd = currentChunk.m_windowEnd;
for (size_t chunk = windowBegin; chunk < windowEnd; chunk++)
{
originalChunks.insert(m_randomizedChunks[chunk].m_originalChunkIndex);
}
}
}
assert(m_samplePositionInEpoch == nextSamplePositionInEpoch);
}
}
return m_epochSize <= m_samplePositionInEpoch;
}
Sequences BlockRandomizer::GetNextSequences(size_t sampleCount)
{
assert(m_frameMode); // TODO sequence mode not implemented yet
assert(m_samplePositionInEpoch != SIZE_MAX); // SetEpochConfiguration() must be called first
std::vector<size_t> originalIds;
std::unordered_set<size_t> originalChunks;
Sequences result;
result.m_endOfEpoch = GetNextSequenceIds(sampleCount, originalIds, originalChunks);
if (originalIds.size() == 0)
{
return result;
}
// Require and release chunks from the data deserializer
for (size_t originalChunkIndex = 0; originalChunkIndex < m_numChunks; originalChunkIndex++)
{
if (originalChunks.find(originalChunkIndex) != originalChunks.end())
{
if (m_chunks.find(originalChunkIndex) == m_chunks.end())
{
m_chunks[originalChunkIndex] = m_deserializer->GetChunk(originalChunkIndex);
}
}
else
{
m_chunks.erase(originalChunkIndex);
}
}
const auto& originalTimeline = m_deserializer->GetSequenceDescriptions();
result.m_data.resize(m_streams.size(), std::vector<SequenceDataPtr>(originalIds.size()));
// TODO: This will be changed, when we move transformers under the randomizer.
// TODO: Randomizer won't should not deal with multithreading.
#pragma omp parallel for ordered schedule(dynamic)
for (int i = 0; i < originalIds.size(); ++i)
{
const auto& sequenceDescription = originalTimeline[originalIds[i]];
auto sequence = m_chunks[sequenceDescription->m_chunkId]->GetSequence(originalIds[i]);
for (int j = 0; j < m_streams.size(); ++j)
{
result.m_data[j][i] = sequence[j];
}
}
return result;
};
}}}