rollingwindowsource.h
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// rollingwindowsource.h -- implementation of a rolling-window minibatch source ('minibatchframesource') with a disk page file
//
#pragma once
#include "Basics.h" // for attempt()
#ifdef _WIN32
#include "numahelpers.h" // for NUMA allocation
#endif
#include "minibatchsourcehelpers.h"
#include "minibatchiterator.h"
#include "biggrowablevectors.h"
#include "ssematrix.h"
#include "RandomOrdering.h"
namespace msra { namespace dbn {
// ---------------------------------------------------------------------------
// biggrowablevectorarray -- a big array of vectors for features, growable (push_back)
// Data is striped across NUMA nodes, as to not clog them up.
// This also supports paging to disk, which is used for the old minibatchframesource.
// ---------------------------------------------------------------------------
class biggrowablevectorarray : public growablevectorbase<msra::dbn::matrix>
{
size_t m; // dim
size_t inmembegin; // range we have in memory, rounded to enclosing blocks (not rounded at end)
size_t inmemend;
std::wstring pagepath; // path for paging, empty if no paging
auto_file_ptr f; // file handle for paging
bool reading; // have we begun reading?
// allocate a block
msra::dbn::matrix *newblock() const
{
// we stripe the data across NUMA nodes as to not fill up one node with the feature data
#ifdef _WIN32
msra::numa::overridenode((int) msra::numa::getmostspaciousnumanode());
#endif
msra::dbn::matrix *res = new msra::dbn::matrix(m, elementsperblock);
#ifdef _WIN32
msra::numa::overridenode(-1); // note: we really should reset it also in case of failure
#endif
return res;
}
// handling of page file
bool paging() const
{
return !pagepath.empty();
}
void openpagefile(bool wantread)
{
if (!paging())
return;
msra::files::make_intermediate_dirs(pagepath);
if (!wantread)
{
FILE *ftry = NULL;
std::wstring pathname(pagepath);
ftry = _wfopen(pathname.c_str(), L"wbS");
if (ftry)
fclose(ftry);
}
/*
code below to cycle through a-z appended to file name is no longer necessary
since caller guarantees unique file names via HTKMLFReader
and we want the pagepath logged to the user to be the actual one used by the code
// try to open the pagepath from a to z
if (!wantread)
{
FILE *ftry = NULL;
char trynum = 'a';
while (!ftry && trynum <= 'z')
{
std::wstring pathname (pagepath);
pathname += trynum++;
ftry = _wfopen (pathname.c_str(), L"wbS");
}
if (ftry) fclose (ftry);
pagepath += --trynum;
}
*/
f = fopenOrDie(pagepath, wantread ? L"rbS" : L"wbS");
reading = wantread;
}
void flushlastblock() // during population phase, must be called once per block in sequence
{
if (!paging())
return;
assert(!reading);
if (blocks.empty())
return;
const size_t blockid = blocks.size() - 1;
msra::dbn::matrix &block = *blocks[blockid];
assert(fgetpos(f) == blockid * block.sizeinpagefile());
block.topagefile(f);
blocks[blockid].reset(); // free the memory
assert(blockid * elementsperblock == inmembegin);
inmembegin = inmemend; // empty range
}
void releaseblock(size_t t0) // t0=block start time
{
assert(paging() && reading);
size_t blockid = t0 / elementsperblock;
assert(blockid * elementsperblock == t0);
assert(blocks[blockid]);
fprintf(stderr, "recoverblock: releasing feature block %d [%d..%d)\n", (int) blockid, (int) t0, (int) (t0 + elementsperblock - 1));
blocks[blockid].reset(); // free the memory
}
void recoverblock(size_t t0) // t0=block start time
{
assert(paging() && reading);
size_t blockid = t0 / elementsperblock;
assert(blockid * elementsperblock == t0);
assert(!blocks[blockid]);
fprintf(stderr, "recoverblock: recovering feature block %d [%d..%d)\n", (int) blockid, (int) t0, (int) (t0 + elementsperblock - 1));
blocks[blockid].reset(newblock());
msra::dbn::matrix &block = *blocks[blockid];
fsetpos(f, blockid * block.sizeinpagefile());
block.frompagefile(f);
}
public:
biggrowablevectorarray(const std::wstring &pagepath)
: growablevectorbase(65536), m(0), inmembegin(0), inmemend(0), pagepath(pagepath), reading(false)
{
openpagefile(false);
if (paging())
fprintf(stderr, "biggrowablevectorarray: creating disk backup store at '%ls'\n", pagepath.c_str());
}
~biggrowablevectorarray()
{ // clean up the big temp file
if (paging())
{
fclose(f);
if (_wunlink(pagepath.c_str()) == 0)
fprintf(stderr, "biggrowablevectorarray: deleted disk backup store at '%ls'\n", pagepath.c_str());
else
fprintf(stderr, "biggrowablevectorarray: unable to delete disk backup store at '%ls'\n", pagepath.c_str());
}
}
size_t dim() const
{
return m;
} // dimension of a frame
// reading phase
void push_back(const std::vector<float> &in)
{
assert(!in.empty());
assert(m == 0 || m == in.size());
m = in.size();
const size_t blockid = n / elementsperblock;
assert(blockid <= blocks.size());
if (blockid == blocks.size()) // a new block is needed
{
flushlastblock();
blocks.push_back(std::unique_ptr<msra::dbn::matrix>(newblock()));
}
const size_t blockn = n % elementsperblock;
msra::dbn::matrix &block = *blocks[blockid].get();
foreach_index (k, in)
block(k, blockn) = in[k];
n++;
inmemend = n;
}
void no_more_push_back() // done pushing --switch to consumption mode
{
if (!paging())
return;
// finish off last block
flushlastblock();
fflushOrDie(f);
fprintf(stderr, "biggrowablevectorarray: disk backup store created, %d frames, %lu bytes\n", (int) n, (unsigned long)fgetpos(f));
fclose(f);
foreach_index (i, blocks)
assert(!blocks[i]); // ensure we flushed
assert(inmembegin == inmemend); // nothing in cache
// switch to reading mode
openpagefile(true);
}
// access phase
// Returns 'true' if data was actually read from disk.
bool require(std::pair<size_t, size_t> bounds) // we require this range of frames
{
bool readfromdisk = false;
// get bounds rounded to block boundaries
const size_t ts = bounds.first / elementsperblock * elementsperblock;
const size_t te = std::min(n, (bounds.second + elementsperblock - 1) / elementsperblock * elementsperblock);
assert(paging());
// free all the memmory
for (size_t t = inmembegin; t < inmemend; t += elementsperblock)
{
if (t >= ts && t < te) // if in wanted range then skip to end of it
t = te - elementsperblock;
else
releaseblock(t);
}
// page in all required blocks
for (size_t t = ts; t < te; t += elementsperblock)
{
if (t >= inmembegin && t < inmemend) // if in memory already then skip to end of it
t = inmemend - elementsperblock;
else
{
recoverblock(t);
readfromdisk = true; // tell caller we did something expensive
}
}
// got it
inmembegin = ts;
inmemend = te;
return readfromdisk;
}
const msra::dbn::matrixstripe operator[](size_t t) const // get a feature vector
{
if (t < inmembegin || t >= inmemend)
LogicError("biggrowablevectorarray: attempt to access vector without requesting to page it in first");
const size_t blockt = getblockt(t);
/*const*/ msra::dbn::matrix &block = getblock(t);
return msra::dbn::matrixstripe(block, blockt, 1);
}
std::wstring pagepathname()
{
return pagepath;
}
void cleanuppagefile()
{
if (paging())
{
fclose(f);
if (_wunlink(pagepath.c_str()) == 0)
{
fprintf(stderr, "biggrowablevectorarray: deleted disk backup store at '%ls'\n", pagepath.c_str());
}
else
{
fprintf(stderr, "biggrowablevectorarray: could NOT delete disk backup store at '%ls'\n", pagepath.c_str());
}
}
}
};
// ---------------------------------------------------------------------------
// minibatchframesource -- feature source to provide randomized frames in minibatches
// This is the old code that pages all frames to a huge disk file first.
// (The new minibatchutterancesource pages from input files directly and can also
// operate in utterance mode for MMI training.)
// ---------------------------------------------------------------------------
class minibatchframesource : public minibatchsource
{
size_t vdim; // feature dimension after augmenting neighhors (0: don't read features)
unsigned int sampperiod; // (for reference and to check against model)
std::string featkind;
size_t featdim;
// cache
biggrowablevectorarray frames; // [t][i] all features concatenated
std::vector<char> boundaryflags; // [t] -1 for first and +1 for last frame, 0 else (for augmentneighbors())
std::vector<CLASSIDTYPE> classids; // [t] the state that the frame belongs to
size_t numframes; // total frames (==frames.size()==boundaryflags.size()==classids.size()) unless special modes vdim == 0 and/or no labels
Microsoft::MSR::CNTK::RandomOrdering m_randomOrdering; // [t] -> t'
double timegetbatch;
int verbosity;
public:
// constructor
// Pass empty labels to denote unsupervised training (so getbatch() will not return uids).
minibatchframesource(const std::vector<std::wstring> &infiles, const std::map<std::wstring, std::vector<msra::asr::htkmlfentry>> &labels,
size_t vdim, size_t udim, size_t randomizationrange, const std::wstring &pagepath, const bool mayhavenoframe = false, int addEnergy = 0)
: vdim(vdim), sampperiod(0), featdim(0), numframes(0), frames(pagepath), timegetbatch(0), verbosity(2)
{
if (vdim == 0 && labels.empty())
RuntimeError("minibatchframesource: when running without features, labels are needed");
// at this stage, we simply page in the entire training set at once and work off RAM
// We will benefit from feature archives indirectly through htkfeatio.
// TODO:
// - infiles must specify time range
// - at this stage only reserve() (we know the time range; allocate second-layer structure)
// - implement block-wise paging directly from HTK feature files through htkfeatreader
featkind.clear();
std::vector<float> frame;
fprintf(stderr, "minibatchframesource: reading %d utterances..", (int) infiles.size());
size_t numclasses = 0; // number of units found (actually max id +1)
size_t notfound = 0; // number of entries missing in MLF
msra::asr::htkfeatreader reader; // feature reader
reader.AddEnergy(addEnergy);
foreach_index (i, infiles)
{
if (i % (infiles.size() / 100 + 1) == 0)
{
fprintf(stderr, ".");
fflush(stderr);
}
msra::basetypes::matrix<float> feat;
msra::asr::htkfeatreader::parsedpath ppath(infiles[i]);
// skip files for which labels don't exist (assuming bad alignment)
std::wstring key;
if (!labels.empty()) // empty means unsupervised mode (don't load any)
{
#ifdef _WIN32
key = regex_replace((std::wstring) ppath, std::wregex(L"\\.[^\\.\\\\/:]*$"), std::wstring()); // delete extension (or not if none)
#else
key = removeExtension(ppath);
#endif
if (labels.find(key) == labels.end())
{
if (notfound < 5)
fprintf(stderr, "\nminibatchframesource: %d-th file not found in MLF label set: %ls", i, key.c_str());
notfound++;
continue; // skip this utterance at all
}
}
// get feature frames
if (vdim != 0) // (vdim == special mode to not read features at all)
{
msra::util::attempt(5, [&]()
{
reader.read(ppath, featkind, sampperiod, feat); // whole file read as columns of feature vectors
});
if (featdim == 0) // first time
featdim = feat.rows();
else if (featdim != feat.rows())
RuntimeError("minibatchframesource: inconsistent feature dimension across files");
// HVite occasionally generates mismatching output --skip such files
if (!key.empty()) // (we have a key if supervised mode)
{
const auto &labseq = labels.find(key)->second; // (we already checked above that it exists)
size_t labframes = labseq.empty() ? 0 : (labseq[labseq.size() - 1].firstframe + labseq[labseq.size() - 1].numframes);
if (abs((int) labframes - (int) feat.cols()) > 0)
{
fprintf(stderr, "\nminibatchframesource: %d-th file has small duration mismatch (%d in label vs. %d in feat file), skipping: %ls", i, (int) labframes, (int) feat.cols(), key.c_str());
notfound++;
continue; // skip this utterance at all
}
}
// append to cache
frame.resize(featdim);
if (feat.cols() < 2) // (2 frames needed for boundary markers)
RuntimeError("minibatchframesource: utterances < 2 frames not supported");
foreach_column (t, feat)
{
foreach_index (k, frame)
frame[k] = feat(k, t);
frames.push_back(frame);
numframes++;
boundaryflags.push_back((t == 0) ? -1 : (t == feat.cols() - 1) ? +1 : 0);
}
assert(numframes == frames.size());
assert(numframes == boundaryflags.size());
}
// get label sequence
if (!key.empty()) // (we have a key if supervised mode)
{
const auto &labseq = labels.find(key)->second; // (we already checked above that it exists)
foreach_index (i2, labseq)
{
const auto &e = labseq[i2];
if ((i2 > 0 && labseq[i2 - 1].firstframe + labseq[i2 - 1].numframes != e.firstframe) || (i2 == 0 && e.firstframe != 0))
RuntimeError("minibatchframesource: labels not in consecutive order MLF in label set: %ls", key.c_str());
for (size_t t = e.firstframe; t < e.firstframe + e.numframes; t++)
{
if (e.classid >= udim)
RuntimeError("minibatchframesource: class id exceeds model dimension in file %ls", key.c_str());
if (e.classid != (CLASSIDTYPE) e.classid)
RuntimeError("CLASSIDTYPE has too few bits");
classids.push_back((CLASSIDTYPE) e.classid);
numclasses = std::max(numclasses, (size_t)(1u + e.classid));
}
}
if (vdim == 0)
numframes = classids.size();
if (numframes != classids.size()) // TODO: remove this once we are confident
RuntimeError("minibatchframesource: label duration inconsistent with feature file in MLF label set: %ls", key.c_str());
assert(numframes == classids.size());
}
else
{
assert(classids.empty()); // that's how we detect it later
}
}
assert(vdim == 0 || numframes == frames.size());
assert(labels.empty() || numframes == classids.size());
if ((vdim != 0 && numframes != frames.size()) || (!labels.empty() && numframes != classids.size()))
RuntimeError("minibatchframesource: numframes variable screwup");
fprintf(stderr, " %d frames read from %d utterances; %d classes\n", (int) numframes, (int) infiles.size(), (int) numclasses);
if (notfound > 0)
{
fprintf(stderr, "minibatchframesource: %d files out of %d not found in label set\n", (int) notfound, (int) infiles.size());
if (notfound > infiles.size() / 2)
RuntimeError("minibatchframesource: too many files not found in label set--assuming broken configuration\n");
}
if (numframes == 0 && !mayhavenoframe)
RuntimeError("minibatchframesource: no input features given!");
// notify frames source to switch from population to consumption mode
frames.no_more_push_back();
// initialize randomizer
if (numframes > 0)
m_randomOrdering.Resize(numframes, randomizationrange);
}
virtual ~minibatchframesource()
{
}
size_t totalframes() const
{
assert(vdim == 0 || numframes == frames.size());
assert(!issupervised() || numframes == classids.size());
return numframes;
}
bool issupervised() const
{
return !classids.empty();
}
void setverbosity(int newverbosity)
{
verbosity = newverbosity;
}
// retrieve one minibatch
// Minibatches are deterministic pseudo-random samples. The entire corpus
// is repeated infinitely, but each repetition (a 'sweep') is randomized
// differently.
// This function allows to retrieve a mini-batch starting from any frame
// within this infinitely extended repetition. To the end, mini-batches are
// specified by start frame and #frames.
// This function returns the same data independent on #frames, i.e. the concept
// of the mini-batch is not defined in here, but on the caller side. The caller
// can retrieve the frames of a mini-batch in chunks that do not match the
// caller's definition of "mini-batch," e.g. bigger or smaller chunks.
// If a requested mini-batch spans a sweep boundary, then this function will
// not return samples after the sweep boundary. Instead, the returned frame
// set is shortened to not exceed the end of the sweep. The caller must make
// a separate second call to get the rest. In trainlayer(), the one
// sweep-boundary-spanning mini-batch will simply be shortened.
// This function is NOT thread-safe (due to caching of random sequence).
bool getbatch(const size_t globalts, const size_t framesrequested, msra::dbn::matrix &feat, std::vector<size_t> &uids,
std::vector<const_array_ref<msra::lattices::lattice::htkmlfwordsequence::word>> &transcripts,
std::vector<std::shared_ptr<const latticesource::latticepair>> &latticepairs)
{
auto_timer timergetbatch;
transcripts.clear(); // word-level transcripts not supported by frame source (aimed at MMI)
latticepairs.clear(); // neither are lattices
assert(totalframes() > 0);
const size_t sweep = globalts / totalframes(); // which sweep (this determines randomization)
const size_t ts = globalts % totalframes(); // start frame within the sweep
const size_t te = std::min(ts + framesrequested, totalframes()); // do not go beyond sweep boundary
assert(te > ts);
if (verbosity >= 2)
fprintf(stderr, "getbatch: frames [%d..%d] in sweep %d\n", (int) ts, (int) (te - 1), (int) sweep);
// get random sequence (each time index occurs exactly once)
// If the sweep changes, this will re-cache the sequence. We optimize for rare, monotonous sweep changes.
const auto &tmap = m_randomOrdering(sweep);
// page in the needed range of frames
const size_t extent = augmentationextent(frames.dim(), vdim);
bool readfromdisk = frames.require(m_randomOrdering.Bounds(std::max(ts, extent) - extent, te + 1 + extent));
// generate features and uids
feat.resize(vdim, te - ts); // note: special mode vdim == 0 means no features to be loaded
if (issupervised()) // empty means unsupervised training -> return empty uids
uids.resize(te - ts);
else
uids.clear();
for (size_t t = ts; t < te; t++)
{
size_t trand = m_randomOrdering.IsRandomizationDisabled() ? t : tmap[t]; // the random-sequence sample point for this point in time
if (vdim != 0)
{
auto v_t = feat.col(t - ts); // the vector to fill in
augmentneighbors(frames, boundaryflags, trand, v_t);
}
if (issupervised())
uids[t - ts] = classids[trand];
}
timegetbatch = timergetbatch;
return readfromdisk;
}
bool getbatch(const size_t globalts, const size_t framesrequested, std::vector<msra::dbn::matrix> &feat, std::vector<std::vector<size_t>> &uids,
std::vector<const_array_ref<msra::lattices::lattice::htkmlfwordsequence::word>> &transcripts,
std::vector<std::shared_ptr<const latticesource::latticepair>> &latticepairs, std::vector<std::vector<size_t>> &sentendmark,
std::vector<std::vector<size_t>> &phoneboundaries)
{
// for single input/output set size to be 1 and run old getbatch
feat.resize(1);
uids.resize(1);
// transcripts.resize(1);
// latticepairs.resize(1);
sentendmark.resize(1);
phoneboundaries.resize(1);
return getbatch(globalts, framesrequested, feat[0], uids[0], transcripts, latticepairs);
}
double gettimegetbatch()
{
return timegetbatch;
}
// return first valid globalts to ask getbatch() for
// In frame mode, there is no constraint, i.e. it is 'globalts' itself.
/*implement*/ size_t firstvalidglobalts(const size_t globalts)
{
return globalts;
}
/*implement*/ const std::vector<size_t> &unitcounts() const
{
LogicError("unitcounts: not implemented for this feature source");
static std::vector<size_t> x;
return x; /*keep compiler happy*/
}
};
// ---------------------------------------------------------------------------
// minibatchframesourcemulti -- feature source to provide randomized frames in minibatches
// this is derived from minibatchframesource but worked with multiple inputs and/or outputs
// by making "frames" and "classids" a vector of vectors
// ---------------------------------------------------------------------------
class minibatchframesourcemulti : public minibatchsource
{
std::vector<size_t> vdim; // feature dimension after augmenting neighhors (0: don't read features)
std::vector<size_t> leftcontext; // number of frames to the left of the target frame in the context window
std::vector<size_t> rightcontext; // number of frames to the right of the target frame in the context window
unsigned int sampperiod; // (for reference and to check against model)
std::string featkind;
size_t featdim;
size_t maxvdim;
// cache
// std::vector<biggrowablevectorarray> frames;
std::vector<std::unique_ptr<biggrowablevectorarray>> pframes; // [t][i] all features concatenated
std::vector<char> boundaryflags; // [t] -1 for first and +1 for last frame, 0 else (for augmentneighbors())
std::vector<std::vector<CLASSIDTYPE>> classids; // [t] the state that the frame belongs to
size_t numframes; // total frames (==frames.size()==boundaryflags.size()==classids.size()) unless special modes vdim == 0 and/or no labels
Microsoft::MSR::CNTK::RandomOrdering m_randomOrdering; // [t] -> t'
double timegetbatch;
int verbosity;
public:
// constructor
// Pass empty labels to denote unsupervised training (so getbatch() will not return uids).
minibatchframesourcemulti(const std::vector<std::vector<std::wstring>> &infiles, const std::vector<std::map<std::wstring, std::vector<msra::asr::htkmlfentry>>> &labels,
std::vector<size_t> vdim, std::vector<size_t> udim, std::vector<size_t> leftcontext, std::vector<size_t> rightcontext, size_t randomizationrange, const std::vector<std::wstring> &pagepath, const bool mayhavenoframe = false, int addEnergy = 0)
: vdim(vdim), leftcontext(leftcontext), rightcontext(rightcontext), sampperiod(0), featdim(0), numframes(0), timegetbatch(0), verbosity(2), maxvdim(0)
{
if (vdim[0] == 0 && labels.empty())
RuntimeError("minibatchframesourcemulti: when running without features, labels are needed");
// at this stage, we simply page in the entire training set at once and work off RAM
// We will benefit from feature archives indirectly through htkfeatio.
// TODO:
// - infiles must specify time range
// - at this stage only reserve() (we know the time range; allocate second-layer structure)
// - implement block-wise paging directly from HTK feature files through htkfeatreader
featkind.clear();
std::vector<float> frame;
std::vector<size_t> numclasses; // number of units found (actually max id +1)
size_t notfound = 0; // number of entries missing in MLF
std::vector<size_t> framesaccum;
if (infiles.size() == 0)
RuntimeError("minibatchframesourcemulti: need at least one network input specified with features");
if (labels.size() == 0)
fprintf(stderr, "no MLF label files detected\n");
foreach_index (i, infiles)
{
pframes.push_back(std::unique_ptr<biggrowablevectorarray>(new biggrowablevectorarray(pagepath[i])));
if (vdim[i] > maxvdim)
maxvdim = vdim[i];
}
foreach_index (i, labels)
{
classids.push_back(std::vector<CLASSIDTYPE>());
numclasses.push_back(0);
}
fprintf(stderr, "minibatchframesourcemulti: reading %d feature sets and %d label sets...", (int) infiles.size(), (int) labels.size());
foreach_index (m, infiles)
{
featdim = 0;
numframes = 0;
featkind.clear();
msra::asr::htkfeatreader reader; // feature reader
reader.AddEnergy(addEnergy);
foreach_index (i, infiles[m]) // read each feature file in set m
{
if (i % (infiles[m].size() / 100 + 1) == 0)
{
fprintf(stderr, ".");
fflush(stderr);
}
msra::basetypes::matrix<float> feat;
msra::asr::htkfeatreader::parsedpath ppath(infiles[m][i]);
// skip files for which labels don't exist (assuming bad alignment)
std::wstring key;
if (!labels.empty())
{
if (!labels[0].empty()) // empty means unsupervised mode (don't load any)
{
#ifdef _WIN32
key = regex_replace((std::wstring) ppath, std::wregex(L"\\.[^\\.\\\\/:]*$"), std::wstring()); // delete extension (or not if none)
#else
key = removeExtension(ppath);
#endif
if (labels[0].find(key) == labels[0].end())
{
if (notfound < 5)
fprintf(stderr, "\nminibatchframesourcemulti: %d-th file not found in MLF label set: %ls", i, key.c_str());
notfound++;
continue; // skip this utterance at all
}
}
}
// get feature frames
if (vdim[m] != 0) // (vdim == special mode to not read features at all)
{
msra::util::attempt(5, [&]()
{
reader.read(ppath, featkind, sampperiod, feat); // whole file read as columns of feature vectors
});
if (featdim == 0) // first time
featdim = feat.rows();
else if (featdim != feat.rows())
RuntimeError("minibatchframesourcemulti: inconsistent feature dimension across files");
// HVite occasionally generates mismatching output --skip such files
if (!key.empty()) // (we have a key if supervised mode)
{
const auto &labseq = labels[0].find(key)->second; // (we already checked above that it exists)
size_t labframes = labseq.empty() ? 0 : (labseq[labseq.size() - 1].firstframe + labseq[labseq.size() - 1].numframes);
if (abs((int) labframes - (int) feat.cols()) > 0)
{
fprintf(stderr, "\nminibatchframesourcemulti: %d-th file has small duration mismatch (%d in label vs. %d in feat file), skipping: %ls", i, (int) labframes, (int) feat.cols(), key.c_str());
notfound++;
continue; // skip this utterance at all
}
}
// append to cache
frame.resize(featdim);
if (feat.cols() < 2) // (2 frames needed for boundary markers)
RuntimeError("minibatchframesourcemulti: utterances < 2 frames not supported");
foreach_column (t, feat)
{
foreach_index (k, frame)
frame[k] = feat(k, t);
pframes[m]->push_back(frame);
numframes++;
if (m == 0)
boundaryflags.push_back((t == 0) ? -1 : (t == feat.cols() - 1) ? +1 : 0);
}
if (m == 0)
framesaccum.push_back(numframes);
else
assert(numframes == framesaccum[i]);
assert(numframes == pframes[m]->size());
}
if (m == 0)
assert(numframes == boundaryflags.size());
if (m == 0) // after we get the key for this file, read all labels (only done for first feature)
{
if (!key.empty())
{
foreach_index (j, labels)
{
const auto &labseq = labels[j].find(key)->second; // (we already checked above that it exists)
foreach_index (i2, labseq)
{
const auto &e = labseq[i2];
if ((i2 > 0 && labseq[i2 - 1].firstframe + labseq[i2 - 1].numframes != e.firstframe) || (i2 == 0 && e.firstframe != 0))
RuntimeError("minibatchframesourcemulti: labels not in consecutive order MLF in label set: %ls", key.c_str());
for (size_t t = e.firstframe; t < e.firstframe + e.numframes; t++)
{
if (e.classid >= udim[j])
RuntimeError("minibatchframesourcemulti: class id exceeds model dimension in file %ls", key.c_str());
if (e.classid != (CLASSIDTYPE) e.classid)
RuntimeError("CLASSIDTYPE has too few bits");
classids[j].push_back((CLASSIDTYPE) e.classid);
numclasses[j] = std::max(numclasses[j], (size_t)(1u + e.classid));
}
}
if (vdim[m] == 0)
numframes = classids[j].size();
if (numframes != classids[j].size()) // TODO: remove this once we are confident
RuntimeError("minibatchframesourcemulti: label duration inconsistent with feature file in MLF label set: %ls", key.c_str());
assert(numframes == classids[j].size());
}
}
else
{
assert(classids.empty());
}
}
}
assert(vdim[m] == 0 || numframes == pframes[m]->size());
foreach_index (j, labels)
assert(labels[j].empty() || numframes == classids[j].size());
if (vdim[m] != 0 && numframes != pframes[m]->size()) // || (!labels.empty() && numframes != classids.size()))
RuntimeError("\nminibatchframesource: numframes variable screwup");
if (m == 0)
{
foreach_index (j, numclasses)
fprintf(stderr, "\nminibatchframesourcemulti: read label set %d: %d classes\n", j, (int) numclasses[j]);
}
fprintf(stderr, "\nminibatchframesourcemulti: feature set %d: %d frames read from %d utterances\n", m, (int) pframes[m]->size(), (int) infiles[m].size());
if (notfound > 0)
{
fprintf(stderr, "minibatchframesourcemulti: %d files out of %d not found in label set\n", (int) notfound, (int) infiles[m].size());
if (notfound > infiles[m].size() / 2)
RuntimeError("minibatchframesourcemulti: too many files not found in label set--assuming broken configuration\n");
}
// notify frames source to switch from population to consumption mode
pframes[m]->no_more_push_back();
}
if (numframes == 0 && !mayhavenoframe)
RuntimeError("minibatchframesource: no input features given!");
// initialize randomizer
if (numframes > 0)
m_randomOrdering.Resize(numframes, randomizationrange);
}
virtual ~minibatchframesourcemulti()
{
}
size_t totalframes() const
{
assert(maxvdim == 0 || numframes == pframes[0]->size());
assert(!issupervised() || numframes == classids[0].size());
return numframes;
}
bool issupervised() const
{
return !classids.empty();
}
void setverbosity(int newverbosity)
{
verbosity = newverbosity;
}
// retrieve one minibatch
// Minibatches are deterministic pseudo-random samples. The entire corpus
// is repeated infinitely, but each repetition (a 'sweep') is randomized
// differently.
// This function allows to retrieve a mini-batch starting from any frame
// within this infinitely extended repetition. To the end, mini-batches are
// specified by start frame and #frames.
// This function returns the same data independent on #frames, i.e. the concept
// of the mini-batch is not defined in here, but on the caller side. The caller
// can retrieve the frames of a mini-batch in chunks that do not match the
// caller's definition of "mini-batch," e.g. bigger or smaller chunks.
// If a requested mini-batch spans a sweep boundary, then this function will
// not return samples after the sweep boundary. Instead, the returned frame
// set is shortened to not exceed the end of the sweep. The caller must make
// a separate second call to get the rest. In trainlayer(), the one
// sweep-boundary-spanning mini-batch will simply be shortened.
// This function is NOT thread-safe (due to caching of random sequence).
bool getbatch(const size_t globalts, const size_t framesrequested, std::vector<msra::dbn::matrix> &feat, std::vector<std::vector<size_t>> &uids,
std::vector<const_array_ref<msra::lattices::lattice::htkmlfwordsequence::word>> &transcripts,
std::vector<std::shared_ptr<const latticesource::latticepair>> &latticepairs, std::vector<std::vector<size_t>> &sentendmark,
std::vector<std::vector<size_t>> &phoneboundaries)
{
auto_timer timergetbatch;
bool readfromdisk;
size_t nreadfromdisk = 0;
transcripts.clear(); // word-level transcripts not supported by frame source (aimed at MMI)
latticepairs.clear(); // neither are lattices
assert(totalframes() > 0);
const size_t sweep = globalts / totalframes(); // which sweep (this determines randomization)
const size_t ts = globalts % totalframes(); // start frame within the sweep
const size_t te = std::min(ts + framesrequested, totalframes()); // do not go beyond sweep boundary
assert(te > ts);
if (verbosity >= 2)
fprintf(stderr, "getbatch: frames [%d..%d] in sweep %d\n", (int) ts, (int) (te - 1), (int) sweep);
// get random sequence (each time index occurs exactly once)
// If the sweep changes, this will re-cache the sequence. We optimize for rare, monotonous sweep changes.
const auto &tmap = m_randomOrdering(sweep);
feat.resize(pframes.size());
uids.resize(classids.size());
sentendmark.resize(classids.size());
phoneboundaries.resize(classids.size());
foreach_index (i, feat)
{
size_t leftextent, rightextent;
// page in the needed range of frames
if (leftcontext[i] == 0 && rightcontext[i] == 0)
{
leftextent = rightextent = augmentationextent(pframes[i]->dim(), vdim[i]);
}
else
{
leftextent = leftcontext[i];
rightextent = rightcontext[i];
}
readfromdisk = pframes[i]->require(m_randomOrdering.Bounds(std::max(ts, leftextent) - leftextent, te + 1 + rightextent));
// generate features and uids
feat[i].resize(vdim[i], te - ts); // note: special mode vdim == 0 means no features to be loaded
if (issupervised()) // empty means unsupervised training -> return empty uids
foreach_index (j, uids)
uids[j].resize(te - ts);
else
uids.clear();
for (size_t t = ts; t < te; t++)
{
size_t trand = m_randomOrdering.IsRandomizationDisabled() ? t : tmap[t]; // the random-sequence sample point for this point in time
if (vdim[i] != 0)
{
auto v_t = feat[i].col(t - ts); // the vector to fill in
augmentneighbors(*pframes[i], boundaryflags, trand, leftextent, rightextent, v_t);
}
if (i == 0)
{ // read labels for all outputs on first pass thru features. this guarantees they will be read if only one feature set but > 1 label set
if (issupervised())
foreach_index (j, uids)
uids[j][t - ts] = classids[j][trand];
}
}
timegetbatch = timergetbatch;
if (readfromdisk)
nreadfromdisk++;
}
(nreadfromdisk == feat.size()) ? readfromdisk = true : readfromdisk = false;
return readfromdisk;
}
bool getbatch(const size_t /*globalts*/, const size_t /*framesrequested*/, msra::dbn::matrix & /*feat*/, std::vector<size_t> & /*uids*/,
std::vector<const_array_ref<msra::lattices::lattice::htkmlfwordsequence::word>> & /*transcripts*/,
std::vector<std::shared_ptr<const latticesource::latticepair>> & /*latticepairs*/)
{
// should never get here
RuntimeError("minibatchframesourcemulti: getbatch() being called for single input feature and single output feature, should use minibatchframesource instead\n");
}
double gettimegetbatch()
{
return timegetbatch;
}
// return first valid globalts to ask getbatch() for
// In frame mode, there is no constraint, i.e. it is 'globalts' itself.
/*implement*/ size_t firstvalidglobalts(const size_t globalts)
{
return globalts;
}
/*implement*/ const std::vector<size_t> &unitcounts() const
{
LogicError("unitcounts: not implemented for this feature source");
}
};
};
};