swh:1:snp:f50ab94432af916b5fb8b4ad831e8dddded77084
Raw File
Tip revision: 6a8773bcf259077f915a302cf2ad0338c9344dce authored by Mark Hamilton on 01 March 2018, 19:17:13 UTC
Bump pom.xml version
Tip revision: 6a8773b
UCIFastReader.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// UCIFastReader.cpp : Defines the exported functions for the DLL application.
//

#include "stdafx.h"
#define DATAREADER_EXPORTS // creating the exports here
#include "DataReader.h"
#include "UCIFastReader.h"
#ifdef LEAKDETECT
#include <vld.h> // leak detection
#endif
#include "fileutil.h" // for fexists()

namespace Microsoft { namespace MSR { namespace CNTK {

template <class ElemType>
size_t UCIFastReader<ElemType>::RandomizeSweep(size_t mbStartSample)
{
    // size_t randomRangePerEpoch = (m_epochSize+m_randomizeRange-1)/m_randomizeRange;
    // return m_epoch*randomRangePerEpoch + epochSample/m_randomizeRange;
    return mbStartSample / m_randomizeRange;
}

// ReadLine - Read a line
// readSample - sample to read in global sample space
// returns - true if we successfully read a record, otherwise false
template <class ElemType>
bool UCIFastReader<ElemType>::ReadRecord(size_t /*readSample*/)
{
    return false; // not used
}

// RecordsToRead - Determine number of records to read to populate record buffers
// mbStartSample - the starting sample from which to read
// tail - we are checking for possible remainer records to read (default false)
// returns - true if we have more to read, false if we hit the end of the dataset
template <class ElemType>
size_t UCIFastReader<ElemType>::RecordsToRead(size_t mbStartSample, bool tail)
{
    assert(mbStartSample >= m_epochStartSample);
    // determine how far ahead we need to read
    bool randomize = Randomize();
    // need to read to the end of the next minibatch
    size_t epochSample = mbStartSample;
    epochSample %= m_epochSize;

    // determine number left to read for this epoch
    size_t numberToEpoch = m_epochSize - epochSample;
    // we will take either a minibatch or the number left in the epoch
    size_t numberToRead = min(numberToEpoch, m_mbSize);
    if (numberToRead == 0 && !tail)
        numberToRead = m_mbSize;

    if (randomize)
    {
        size_t randomizeSweep = RandomizeSweep(mbStartSample);
        // if first read or read takes us to another randomization range
        // we need to read at least randomization range records
        if (randomizeSweep != m_randomordering.CurrentSeed()) // the range has changed since last time
        {
            numberToRead = RoundUp(epochSample, m_randomizeRange) - epochSample;
            if (numberToRead == 0 && !tail)
                numberToRead = m_randomizeRange;
        }
    }
    return numberToRead;
}

// EnsureDataAvailable - Read enough lines so we can request a minibatch starting as requested
// mbStartSample - the starting sample we are ensureing are good
// endOfDataCheck - check if we are at the end of the dataset (no wraparound)
// returns - true if we have more to read, false if we hit the end of the dataset
template <class ElemType>
bool UCIFastReader<ElemType>::EnsureDataAvailable(size_t mbStartSample, bool endOfDataCheck)
{
    assert(mbStartSample >= m_epochStartSample);
    // determine how far ahead we need to read
    Randomize();
    // need to read to the end of the next minibatch
    size_t epochSample = mbStartSample;
    epochSample %= m_epochSize;
    bool moreToRead = true;

    size_t numberToRead = RecordsToRead(mbStartSample);

    // check to see if we have the proper records read already
    if (m_readNextSample >= mbStartSample + numberToRead && mbStartSample >= m_epochStartSample)
        return true;

    // truncate the present arrays to the location we are reading from, parser appends on these arrays
    if (m_featureData.size() > epochSample * m_featureCount) // should be this size, if not, truncate
        m_featureData.resize(epochSample * m_featureCount);
    if (m_labelType == labelCategory && m_labelData.size() > epochSample)
    {
        m_labelIdData.resize(epochSample);
        m_labelData.resize(epochSample);
    }
    else if (m_labelType != labelNone && m_labelData.size() > epochSample * m_labelDim)
    {
        m_labelData.resize(epochSample * m_labelDim);
    }

    int recordsRead = 0;
    do
    {
        int numRead = m_parser->Parse(numberToRead - recordsRead, &m_featureData, &m_labelData);

        recordsRead += numRead;
        if (!m_endReached)
            m_totalSamples += numRead; // total number of records in the dataset

        // we should only get less records than requested at when we hit the end of the dataset
        if (recordsRead < numberToRead)
        {
            // update dataset variables
            size_t additionalToRead = UpdateDataVariables(mbStartSample + recordsRead);

            m_parser->SetFilePosition(0); // make another pass of the dataset

            // if doing and end of data check, and we are at the end
            // or a partial minibatch was found exit now
            if ((endOfDataCheck && recordsRead == 0) ||
                (m_partialMinibatch && recordsRead > 0))
            {
                moreToRead = false;
                break;
            }

            // get the additional number to read
            numberToRead = recordsRead + additionalToRead;
        }
    } while (recordsRead < numberToRead);
    m_readNextSample += recordsRead;

    // for category labels, we need to build up a list of IDs and a mapping table
    if (m_labelType == labelCategory)
    {
        // loop through all the newly read records
        for (int numberRead = 0; numberRead < recordsRead; numberRead++)
        {
            LabelType& label = m_labelData[epochSample + numberRead];
            // check to see if we have seen this label before
            auto value = m_mapLabelToId.find(label);
            LabelIdType labelId;
            if (value == m_mapLabelToId.end())
            {
                if (m_labelFileToWrite.empty())
                    RuntimeError("label found in data not specified in label mapping file: %s", label.c_str());
                // new label so add it to the mapping tables
                m_mapLabelToId[label] = m_labelIdMax;
                m_mapIdToLabel[m_labelIdMax] = label;
                labelId = m_labelIdMax++;

                // if our label dimension is lower than the current labelId then increase it
                if (m_labelDim < m_labelIdMax)
                    m_labelDim = m_labelIdMax;
            }
            else
            {
                labelId = value->second;
            }

            // now add the label id to the label data array
            m_labelIdData.push_back(labelId);
        }
    }
    // if there more to read (always is, unless we want partial minibatches
    return moreToRead;
}

// UpdateDataVariables - Update variables that depend on the dataset being completely read
template <class ElemType>
size_t UCIFastReader<ElemType>::UpdateDataVariables(size_t mbStartSample)
{
    // if we haven't been all the way through the file yet
    if (!m_endReached)
    {
        // get the size of the dataset
        assert(m_totalSamples * m_featureCount >= m_featureData.size());

        // if they want us to determine epoch size based on dataset size, do that
        if (m_epochSize == requestDataSize)
        {
            // set the epoch size to be a multiple of mbSize or randomization range
            if (m_partialMinibatch)
                m_epochSize = m_totalSamples;
            else
            {
                size_t roundUpTo = m_mbSize;
                if (m_randomizeRange != randomizeAuto && m_randomizeRange != randomizeNone)
                    roundUpTo = m_randomizeRange;
                m_epochSize = RoundUp(m_totalSamples, roundUpTo);
            }
        }

        // make sure randomization range is within the sample bounds
        if (m_randomizeRange > m_epochSize)
        {
            m_randomizeRange = m_epochSize;
            m_randomordering.Resize(m_randomizeRange, m_randomizeRange);
        }

        // write the label file if we hit the end of the file
        WriteLabelFile();

        // we got to the end of the dataset
        m_endReached = true;
    }

    // update the label dimension if it is not big enough, need it here because m_labelIdMax get's updated in the processing loop (after a read)
    if (m_labelType == labelCategory && m_labelIdMax > m_labelDim)
        m_labelDim = m_labelIdMax; // update the label dimensions if different

    bool recordsToRead = mbStartSample < m_epochStartSample + m_epochSize; // still some to read after potential epochSize change?
    return recordsToRead ? RecordsToRead(mbStartSample) : 0;
}

template <class ElemType>
void UCIFastReader<ElemType>::WriteLabelFile()
{
    // write out the label file if they don't have one
    if (!m_labelFileToWrite.empty())
    {
        if (m_mapIdToLabel.size() > 0)
        {
            File labelFile(m_labelFileToWrite, fileOptionsWrite | fileOptionsText);
            for (int i = 0; i < m_mapIdToLabel.size(); ++i)
            {
                labelFile << m_mapIdToLabel[i] << '\n';
            }
            fprintf(stderr, "label file %ls written to disk\n", m_labelFileToWrite.c_str());
            m_labelFileToWrite.clear();
        }
        else if (!m_cachingWriter)
        {
            fprintf(stderr, "WARNING: file %ls NOT written to disk yet, will be written the first time the end of the entire dataset is found.\n", m_labelFileToWrite.c_str());
        }
    }
}

// Destroy - cleanup and remove this class
// NOTE: this destroys the object, and it can't be used past this point
template <class ElemType>
void UCIFastReader<ElemType>::Destroy()
{
    delete this;
}

// Init - Reader Initialize for multiple data sets
// config - [in] configuration parameters for the datareader
// Sample format below:
//# Parameter values for the reader
//reader=[
//  # reader to use
//  readerType=UCIFastReader
//  miniBatchMode=Partial
//  randomize=None
//  features=[
//    dim=784
//    start=1
//    file=c:\speech\mnist\mnist_test.txt
//  ]
//  labels=[
//    dim=1
//      start=0
//      file=c:\speech\mnist\mnist_test.txt
//      labelMappingFile=c:\speech\mnist\labels.txt
//      labelDim=10
//      labelType=Category
//  ]
//]
template <class ElemType>
template <class ConfigRecordType>
void UCIFastReader<ElemType>::InitFromConfig(const ConfigRecordType& readerConfig)
{
    //customize delimiter and decimal points.
    string customDelimiterStr = readerConfig(L"customDelimiter", "");
    char customDelimiter = customDelimiterStr == "" ? char(0) : customDelimiterStr[0];

    string customDecimalPointStr = readerConfig(L"customDecimalPoint", "");
    char customDecimalPoint = customDecimalPointStr == "" ? char(0) : customDecimalPointStr[0];

    m_parser = make_shared<UCIParser<ElemType, LabelType>>(customDelimiter, customDecimalPoint);

    // See if the user wants caching
    m_cachingReader = NULL;
    m_cachingWriter = NULL;

    // initialize the cache
    InitCache(readerConfig);

    // if we have a cache, no need to parse the text files...
    if (m_cachingReader)
        return;

    std::vector<std::wstring> features;
    std::vector<std::wstring> labels;
    GetFileConfigNames(readerConfig, features, labels);
    if (features.size() > 0)
    {
        m_featuresName = features[0];
        if (!readerConfig.Exists(m_featuresName)) // BUGBUG: How can this ever fire? We wouldn't be able to get this name in the first place if it wasn't there.
            RuntimeError("features file not found, required in configuration: i.e. 'features=[file=c:\\myfile.txt;start=1;dim=123]'");
    }
    if (labels.size() > 0)
    {
        m_labelsName = labels[0];
    }
    bool hasLabels = readerConfig.Exists(m_labelsName);
    if (!hasLabels)
        fprintf(stderr, "Warning: labels are not specified.");

    const ConfigRecordType& configFeatures = readerConfig(m_featuresName.c_str(), ConfigRecordType::Record());
    const ConfigRecordType& configLabels = readerConfig(m_labelsName.c_str(), ConfigRecordType::Record());

    // determine label type desired
    std::wstring labelType;
    if (!hasLabels)
        labelType = L"none";
    else
        labelType = (wstring)configLabels(L"labelType", L"category");

    if (!EqualCI(labelType, L"none") && configFeatures(L"file", L"") != configLabels(L"file", L""))
        RuntimeError("features and label files must be the same file, use separate readers to define single use files");

    size_t vdim = configFeatures(L"dim");
    // string name = configFeatures.Name();            // TODO: Aaargh!!!
    size_t udim = configLabels(L"labelDim", (size_t) 0);

    // initialize all the variables
    m_mbStartSample = m_epoch = m_totalSamples = m_epochStartSample = 0;
    m_labelIdMax = m_labelDim = 0;
    m_partialMinibatch = m_endReached = false;
    m_labelType = labelCategory;
    m_featureCount = vdim;
    m_readNextSample = 0;
    m_traceLevel = readerConfig(L"traceLevel", 0);
    m_parser->SetTraceLevel(m_traceLevel);

    m_prefetchEnabled = readerConfig(L"prefetch", false);
    // set the feature count to at least one (we better have one feature...)
    assert(m_featureCount != 0);

    if (readerConfig.Exists(L"randomize"))
    {
        string randomizeString = readerConfig(L"randomize");
        if (EqualCI(randomizeString, "none"))
        {
            m_randomizeRange = randomizeNone;
        }
        else if (EqualCI(randomizeString, "auto"))
        {
            m_randomizeRange = randomizeAuto;
        }
        else
        {
            m_randomizeRange = readerConfig(L"randomize");
        }
    }
    else
    {
        m_randomizeRange = randomizeAuto;
    }

    // determine if we partial minibatches are desired
    std::string minibatchMode(readerConfig(L"minibatchMode", "partial"));
    m_partialMinibatch = EqualCI(minibatchMode, "partial");

    // get start and dimensions for labels and features
    size_t startLabels = configLabels(L"start", (size_t) 0);
    size_t dimLabels = configLabels(L"dim", (size_t) 0);

    size_t startFeatures = configFeatures(L"start", (size_t) 0);
    size_t dimFeatures = configFeatures(L"dim", (size_t) 0);


    // convert to lower case for case insensitive comparison
    if (EqualCI(labelType, L"category"))
    {
        m_labelType = labelCategory;
    }
    else if (EqualCI(labelType, L"regression"))
    {
        m_labelType = labelRegression;
    }
    else if (EqualCI(labelType, L"none"))
    {
        m_labelType = labelNone;
        dimLabels = 0; // override for no labels
    }

    std::wstring file = configFeatures(L"file");
    if (m_traceLevel > 0)
        fprintf(stderr, "Reading UCI file %ls\n", file.c_str());

    // Simple heuristic to ensure buffer size and avoid breaking existing experiments.
    size_t bufSize = max(dimFeatures * 16, (size_t) 256 * 1024);
    m_parser->ParseInit(file.c_str(), startFeatures, dimFeatures, startLabels, dimLabels, bufSize);

    // if we have labels and labels are categorical values, we need a label Mapping file, it will be a file with one label per line
    if (m_labelType == labelCategory)
    {
        std::wstring labelPath = configLabels(L"labelMappingFile");
        if (fexists(labelPath))
        {
            // TODO: We use the old CNTK config reader for this. With BrainScript, we would have to parse the file locally here, which should be easy.
            ConfigArray arrayLabels;
            arrayLabels.LoadConfigFile(labelPath);
            for (int i = 0; i < arrayLabels.size(); ++i)
            {
                LabelType label = arrayLabels[i];
                m_mapIdToLabel[i] = label;
                m_mapLabelToId[label] = i;
            }
            m_labelIdMax = (LabelIdType) arrayLabels.size();
        }
        else
        {
            // only do label creation if we have the allow flag, should only be done as a separate command
            // to ensure that the label file will exist for verification step in training
            bool allowLabelCreation = readerConfig(L"allowMapCreation", false);
            if (allowLabelCreation)
                m_labelFileToWrite = labelPath;
            else
                RuntimeError("label mapping file %ls not found, can be created with a 'createLabelMap' command/action\n", labelPath.c_str());
        }

        // if the value they passed in as udim is not big enough, add something on
        if (udim < m_labelIdMax)
            udim = m_labelIdMax;
        m_labelDim = (LabelIdType)udim;
    }
    else
    {
        m_labelDim = (LabelIdType)dimLabels;
    }

    // if we know the size of the randomization now, resize, otherwise wait until we know the epochSize in StartMinibatchLoop()
    if (Randomize() && m_randomizeRange != randomizeAuto)
        m_randomordering.Resize(m_randomizeRange, m_randomizeRange);

    mOneLinePerFile = readerConfig(L"oneLinePerFile", false);
}

// InitCache - Initialize the caching reader if cache files exist, otherwise the writer
// readerConfig - reader configuration
template <class ElemType>
void UCIFastReader<ElemType>::InitCache(const ScriptableObjects::IConfigRecord& readerConfig)
{
    // check for a writer tag first (lets us know we are caching)
    if (readerConfig.Exists(L"writerType"))
        InvalidArgument("UCIFastReader: Caching ('writerType') is currently not supported for BrainScript.");
    // BrainScript cannot support this because of the manipulations of ConfigRecords, which the ScriptableObjects interface does not support (IConigRecords are immutable).
    // TODO: We could implement an overlay IConfigRecord implementation that fakes the two values that are being added to the interface.
    // BrainScript also cannot support this because a copy of the readerConfig is kept. IConfigRecords are not copyable.
    // TODO: We could copy the IConfigRecordPtr. That is allowed. Not trivial to do with template magic.
}
template <class ElemType>
void UCIFastReader<ElemType>::InitCache(const ConfigParameters& readerConfig)
{
    // first time we are called we cache our parameters
    // Note: This is a little ugly. It is an artifact of integrating with BrainScript. Since BrainScript does not allow to copy configs, I moved that copy code here into the ConfigParameters implementation.
    if (&readerConfig != &m_readerConfig)
        readerConfig.CopyTo(m_readerConfig);

    // check for a writer tag first (lets us know we are caching)
    if (!readerConfig.Exists(L"writerType"))
        return;

    // first try to open the binary cache
    bool found = false;
    try
    {
        // TODO: need to go down to all levels, maybe search for sectionType
        ConfigArray filesList(',');
        vector<std::wstring> names;
        if (readerConfig.Exists(L"wfile"))
        {
            filesList.push_back(readerConfig(L"wfile"));
            if (fexists(readerConfig(L"wfile")))
                found = true;
        }
        FindConfigNames(readerConfig, "wfile", names);
        for (const auto& name : names)
        {
            ConfigParameters config = readerConfig(name);
            filesList.push_back(config("wfile"));
            if (fexists(config("wfile")))
                found = true;
        }

        // if we have a file already, we are going to read the cached files
        if (found)
        {
            ConfigParameters config;
            readerConfig.CopyTo(config);
            // mmodify the config so the reader types look correct
            config["readerType"] = config("writerType");
            config["file"] = filesList;
            m_cachingReader = new DataReader(config);
        }
        else
        {
            m_cachingWriter = new DataWriter(readerConfig);

            // now get the section names for map and category types
            std::map<std::wstring, SectionType, nocase_compare> sections;
            m_cachingWriter->GetSections(sections);
            for (const auto& pair : sections)
            {
                if (pair.second == sectionTypeCategoryLabel)
                {
                    m_labelsCategoryName = pair.first;
                }
                else if (pair.second == sectionTypeLabelMapping)
                {
                    m_labelsMapName = pair.first;
                }
            }
        }
    }
    catch (runtime_error err)
    {
        // In case caching reader/writer cannot be created, we gracefully fail over and disable caching.
        fprintf(stderr, "Error attemping to create Binary%s\n%s\n", found ? "Reader" : "Writer", err.what());
        delete m_cachingReader;
        m_cachingReader = NULL;
        delete m_cachingWriter;
        m_cachingWriter = NULL;
    }
    catch (...)
    {
        // if there is any error, just get rid of the object
        fprintf(stderr, "Error attemping to create Binary%s\n", found ? "Reader" : "Writer");
        delete m_cachingReader;
        m_cachingReader = NULL;
        delete m_cachingWriter;
        m_cachingWriter = NULL;
    }
}

// destructor - virtual so it gets called properly
template <class ElemType>
UCIFastReader<ElemType>::~UCIFastReader()
{
    ReleaseMemory();
    delete m_cachingReader;
    delete m_cachingWriter;
}

// ReleaseMemory - release the memory footprint of UCIFastReader
// used when the caching reader is taking over
template <class ElemType>
void UCIFastReader<ElemType>::ReleaseMemory()
{
    m_featuresBuffer = NULL;
    m_labelsBuffer = NULL;
    m_labelsIdBuffer = NULL;
    m_featureData.clear();
    m_labelIdData.clear();
    m_labelData.clear();
}

//SetupEpoch - Setup the proper position in the file, and other variable settings to start a particular epoch
template <class ElemType>
void UCIFastReader<ElemType>::SetupEpoch()
{
    // if we are starting fresh (epoch zero and no data read), init everything
    // however if we are using cachingWriter, we need to know record count, so do that first
    if (m_epoch == 0 && m_totalSamples == 0 && m_cachingWriter != NULL)
    {
        m_readNextSample = m_epochStartSample = m_mbStartSample = 0;
        m_parser->SetFilePosition(0);
    }
    else // otherwise, position the read to start at the right location
    {
        // don't know the total number of samples yet, so count them
        if (m_totalSamples == 0)
        {
            if (m_traceLevel > 0)
                fprintf(stderr, "UCIFastReader: Starting at epoch %lu, counting lines to determine record count...\n", (unsigned long) m_epoch);
            m_parser->SetParseMode(ParseLineCount);
            m_totalSamples = m_parser->Parse(size_t(-1), NULL, NULL);
            m_parser->SetParseMode(ParseNormal);
            m_parser->SetFilePosition(0);
            m_mbStartSample = 0;
            UpdateDataVariables(0); // update all the variables since we read to the end...
            if (m_traceLevel > 0)
                fprintf(stderr, " %lu records found.\n", (unsigned long) m_totalSamples);
        }

        // make sure we are in the correct location for mid-dataset epochs
        size_t mbStartSample = m_epoch * m_epochSize;

        size_t fileRecord = m_totalSamples ? mbStartSample % m_totalSamples : 0;
        fprintf(stderr, "starting epoch %lu at record count %lu, and file position %lu\n", (unsigned long) m_epoch+1, (unsigned long) mbStartSample, (unsigned long) fileRecord);
        size_t currentFileRecord = m_mbStartSample % m_totalSamples;

        // reset the next read sample
        m_readNextSample = mbStartSample;
        if (currentFileRecord == fileRecord)
        {
            fprintf(stderr, "already there from last epoch\n");

            // we have a slight delima here, if we haven't determined the end of the file yet
            // and the user told us to find how many records are in the file, we can't distinguish "almost done"
            // with a file (a character away) and the middle of the file. So read ahead a record to see if it's there.
            bool endReached = m_endReached;
            if (!endReached)
            {
                if (!m_parser->HasMoreData())
                {
                    endReached = true;
                    UpdateDataVariables(mbStartSample);
                    assert(m_endReached);
                }
            }
            // move the read pointer to the end since we have everything already in memory.
            if (endReached && m_epochStartSample % m_totalSamples == fileRecord && m_featureData.size() >= m_epochSize * m_featureCount)
            {
                m_readNextSample = mbStartSample + m_epochSize;
                // write the label file here to make sure we do it somewhere. We know the entire dataset has been read at this point
                WriteLabelFile();
            }
        }
        // not the right position, need to get there
        else
        {
            // if we are already past the desired record, start at the beginning again
            if (currentFileRecord > fileRecord)
            {
                m_parser->SetFilePosition(0);
                currentFileRecord = 0;
            }
            fprintf(stderr, "reading from record %lu to %lu to be positioned properly for epoch\n", (unsigned long) currentFileRecord, (unsigned long) fileRecord);
            m_parser->SetParseMode(ParseLineCount);
            m_parser->Parse(fileRecord - currentFileRecord, NULL, NULL);
            m_parser->SetParseMode(ParseNormal);
            if (!m_labelFileToWrite.empty())
            {
                fprintf(stderr, "WARNING: file %ls NOT written to disk, label file will only be written when starting epochs at the beginning of the dataset\n", m_labelFileToWrite.c_str());
                m_labelFileToWrite.clear();
                RuntimeError("LabelMappingFile not provided in config, must be provided if not starting from epoch Zero (0)");
            }
        }
        m_epochStartSample = m_mbStartSample = mbStartSample;
    }
}

// utility function to round an integer up to a multiple of size
size_t RoundUp(size_t value, size_t size)
{
    return ((value + size - 1) / size) * size;
}

template <class ElemType>
void UCIFastReader<ElemType>::SetNumParallelSequences(const size_t sz)
{
    mRequestedNumParallelSequences = sz;
    if (mOneLinePerFile)
        m_mbSize = mRequestedNumParallelSequences;
};

//StartMinibatchLoop - Startup a minibatch loop
// mbSize - [in] size of the minibatch (number of Samples, etc.)
// epoch - [in] epoch number for this loop, if > 0 the requestedEpochSamples must be specified (unless epoch zero was completed this run)
// requestedEpochSamples - [in] number of samples to randomize, defaults to requestDataSize which uses the number of samples there are in the dataset
//   this value must be a multiple of mbSize, if it is not, it will be rounded up to one.
template <class ElemType>
void UCIFastReader<ElemType>::StartDistributedMinibatchLoop(size_t mbSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples /*= requestDataSize */)
{
    m_subsetNum = subsetNum;
    m_numSubsets = numSubsets;
    if (mOneLinePerFile)
        mbSize = mRequestedNumParallelSequences; // each file has only one observation, therefore the number of data to read is the number of files

    // if we aren't currently caching, see if we can use a cache
    if (!m_cachingReader && !m_cachingWriter)
    {
        InitCache(m_readerConfig);
        if (m_cachingReader)
            ReleaseMemory(); // free the memory used by the UCIFastReader
    }

    // if we are reading from the cache, do so now and return
    if (m_cachingReader)
    {
        m_cachingReader->StartMinibatchLoop(mbSize, epoch, requestedEpochSamples);
        return;
    }

    // if we are reallocating bigger, release the original
    if (mbSize > m_mbSize)
    {
        m_featuresBuffer = NULL;
        m_labelsBuffer = NULL;
        m_labelsIdBuffer = NULL;
    }

    m_mbSize = mbSize;
    if (requestedEpochSamples == requestDataSize)
    {
        if (!m_endReached)
        {
            m_epochSize = requestDataSize;
        }
    }
    else
    {
        m_epochSize = requestedEpochSamples;
        if (!m_partialMinibatch)
            m_epochSize = RoundUp(requestedEpochSamples, mbSize);
        if (m_epochSize != requestedEpochSamples)
            fprintf(stderr, "epochSize rounded up to %d to fit an integral number of minibatches\n", (int) m_epochSize);
    }

    // set the randomization range for randomizationAuto
    // or if it's invalid less than the minibatch size, we need to make it at least minibatch size
    if (m_randomizeRange != randomizeNone)
    {
        if (m_epochSize != requestDataSize && m_randomizeRange == randomizeAuto)
        {
            m_randomizeRange = m_epochSize;
        }
        m_randomizeRange = max(m_randomizeRange, m_mbSize);
        if (m_randomizeRange != randomizeAuto)
        {
            if ((m_epochSize != requestDataSize && m_epochSize % m_randomizeRange != 0) || (m_randomizeRange % m_mbSize != 0))
                RuntimeError("randomizeRange must be an even multiple of mbSize and an integral factor of epochSize");
            m_randomordering.Resize(m_randomizeRange, m_randomizeRange);
        }
    }

    // we use epochSize, which might not be set yet, so use a default value for allocations if not yet set
    size_t epochSize = m_epochSize == requestDataSize ? 1000 : m_epochSize;
    m_epoch = epoch;
    m_mbStartSample = epoch * m_epochSize;

    // allocate room for the data
    m_featureData.reserve(m_featureCount * epochSize);
    if (m_labelType == labelCategory)
    { 
        m_labelIdData.reserve(epochSize);
        m_labelData.reserve(epochSize);
    }
    else if (m_labelType != labelNone)
        m_labelData.reserve(m_labelDim * epochSize);

    SetupEpoch();
}

// function to store the LabelType in an ElemType
// required for string labels, which can't be stored in ElemType arrays
template <class ElemType>
void UCIFastReader<ElemType>::StoreLabel(ElemType& labelStore, const LabelType& labelValue)
{
    labelStore = (ElemType) m_mapLabelToId[labelValue];
}

// GetMinibatch - Get the next minibatch (features and labels)
// matrices - [in] a map with named matrix types (i.e. 'features', 'labels') mapped to the corresponding matrix,
//             [out] each matrix resized if necessary containing data.
// returns - true if there are more minibatches, false if no more minibatches remain
template <class ElemType>
bool UCIFastReader<ElemType>::TryGetMinibatch(StreamMinibatchInputs& matrices)
{
    bool minibatchesRemaining = true;
    if (m_pendingAsyncGetMinibatch.valid())
    {
        // An async GetMinibatch is in flight. Wait for it to finish and swap
        // the contents of the m_prefetchedMatrices and parameter matrices
        minibatchesRemaining = m_pendingAsyncGetMinibatch.get();

        // Now swap the m_prefetchedMatrices and parameter matrices
        for (auto& iter : matrices)
        {
            if (m_prefetchMatrices.find(iter.first) == m_prefetchMatrices.end())
                LogicError("GetMinibatch: No matching prefetch matrix found for matrix named %ls.", iter.first.c_str());

            std::swap(matrices.GetInputMatrix<ElemType>(iter.first), m_prefetchMatrices.GetInputMatrix<ElemType>(iter.first)); // BUGBUG?: This swaps the matrix structures directly, messing with ownership. And are we sure it does not do deep copies for this?
            //Matrix<ElemType>* prefetchMatrix = m_prefetchMatrices[iter->first].get();
            //std::swap(*(iter->second), *prefetchMatrix);
        }
    }
    else
    {
        minibatchesRemaining = GetMinibatchImpl(matrices);

        // Allocate prefetch matrices if were firing an async minibatch prefetch
        if (minibatchesRemaining && m_prefetchEnabled)
        {
            // create a matrix for every output
            m_prefetchMatrices.clear();

            for (auto& iter : matrices)
                m_prefetchMatrices.AddInput(iter.first, make_shared<Matrix<ElemType>>(iter.second.matrix->GetDeviceId()), iter.second.pMBLayout, iter.second.sampleLayout);
        }
    }

    // Fire a new prefetch if there are any minibatches remaining
    if (minibatchesRemaining && m_prefetchEnabled)
    {
        Matrix<ElemType>& features = matrices.GetInputMatrix<ElemType>(m_featuresName);
        int deviceId = features.GetDeviceId();
        m_pendingAsyncGetMinibatch = std::async(std::launch::async, [this, deviceId]()
        {
            // Set the device since this will execute on a new thread
            Matrix<ElemType>::SetDevice(deviceId);

            StreamMinibatchInputs prefetchMatrices;
            for (auto& iter : m_prefetchMatrices)
                prefetchMatrices.AddInput(iter.first, iter.second);
            // TODO: Why can we not just pass m_prefetchMatrices?

            return GetMinibatchImpl(prefetchMatrices);
        });
    }

    return minibatchesRemaining;
}

// GetMinibatchImpl - The actual implementation of getting the next minibatch (features and labels)
// matrices - [in] a map with named matrix types (i.e. 'features', 'labels') mapped to the corresponding matrix,
//             [out] each matrix resized if necessary containing data.
// returns - true if there are more minibatches, false if no more minibatches remain
template <class ElemType>
bool UCIFastReader<ElemType>::GetMinibatchImpl(StreamMinibatchInputs& matrices)
{
    if (m_cachingReader)
    {
        return m_cachingReader->GetMinibatch(matrices);
    }
    // get the features array
    if (matrices.find(m_featuresName) == matrices.end())
        RuntimeError("Features matrix not found in config file, there should be a node '%ls' in the network.", m_featuresName.c_str());

    Matrix<ElemType>& features = matrices.GetInputMatrix<ElemType>(m_featuresName);

    // get out if they didn't call StartMinibatchLoop() first  --BUGBUG: We should throw in that case.
    if (m_mbSize == 0)
        return false;

    // check to see if we have changed epochs, if so we are done with this one.
    if (m_mbStartSample / m_epochSize != m_epoch)
        return false;

    bool randomize = Randomize();
    bool moreData = EnsureDataAvailable(m_mbStartSample);

    // figure which sweep of the randomization we are on
    size_t epochSample = m_mbStartSample % m_epochSize; // where the minibatch starts in this epoch
    // size_t samplesExtra = m_totalSamples % m_epochSize; // extra samples at the end of an epoch
    // size_t epochsDS = (m_totalSamples+m_epochSize-1)/m_epochSize; // how many epochs per dataset
    size_t randomizeSet = randomize ? RandomizeSweep(m_mbStartSample) : 0;
    const auto& tmap = m_randomordering(randomizeSet);
    size_t epochEnd = m_epochSize;
    size_t recordStart = m_totalSamples ? m_mbStartSample % m_totalSamples : m_mbStartSample;

    // actual size is either what requested, or total number of samples read so far
    size_t actualmbsize = min(m_totalSamples, m_mbSize); // it may still return less if at end of sweep

    // check for an odd sized last minibatch
    if (epochSample + actualmbsize > epochEnd)
    {
        actualmbsize = epochEnd - epochSample;
    }

    // hit the end of the dataset, we should only get here in "one=pass mode"
    if (!moreData)
    {
        // make sure we take into account hitting the end of the dataset (not wrapping around)
        actualmbsize = min(m_totalSamples - recordStart, actualmbsize);
    }

    if (m_featuresBuffer == NULL)
    {
        m_featuresBuffer = AllocateIntermediateBuffer(features.GetDeviceId(), m_mbSize * m_featureCount);
        memset(m_featuresBuffer.get(), 0, sizeof(ElemType) * m_mbSize * m_featureCount);
    }

    if (m_labelsBuffer == NULL)
    {
        if (m_labelType == labelCategory)
        {
            m_labelsBuffer = AllocateIntermediateBuffer(features.GetDeviceId(), m_labelDim * m_mbSize);
            m_labelsIdBuffer = std::shared_ptr<LabelIdType>(new LabelIdType[m_mbSize], [](LabelIdType* p)
                                                            {
                                                                delete[] p;
                                                            });
        }
        else if (m_labelType != labelNone)
        {
            m_labelsBuffer = AllocateIntermediateBuffer(features.GetDeviceId(), m_labelDim * m_mbSize);
            m_labelsIdBuffer = NULL;
        }
    }

    if (m_labelType == labelCategory)
    {
        memset(m_labelsBuffer.get(), 0, sizeof(ElemType) * m_labelDim * actualmbsize);
        memset(m_labelsIdBuffer.get(), 0, sizeof(LabelIdType) * actualmbsize);
    }
    else if (m_labelType != labelNone)
    {
        memset(m_labelsBuffer.get(), 0, sizeof(ElemType) * m_labelDim * actualmbsize);
    }

    if (actualmbsize > 0)
    {
        // loop through and copy data to matrix
        int j = 0; // vector of vectors of feature data
        // determine randomization base index
        size_t randBase = 0; // (keep compiler happy)
        if (randomize)
            randBase = epochSample - epochSample % m_randomizeRange;

        // loop through all the samples
        for (size_t jSample = m_mbStartSample; j < actualmbsize; ++j, ++jSample)
        {
            // pick the right sample with randomization if desired
            size_t jRand = randomize ? (randBase + tmap[jSample % m_randomizeRange]) : jSample;
            jRand %= m_epochSize; // BUGBUG: this will make it randomize only inside m_epochSize which is not enough 

            // vector of feature data goes into matrix column
            memcpy(&m_featuresBuffer.get()[j * m_featureCount], &m_featureData[jRand * m_featureCount], sizeof(ElemType) * m_featureCount);

            if (m_labelType == labelCategory)
            {
                m_labelsBuffer.get()[j * m_labelDim + m_labelIdData[jRand]] = (ElemType) 1;
                m_labelsIdBuffer.get()[j] = m_labelIdData[jRand];
            }
            else if (m_labelType != labelNone)
            {
                for (size_t ii = 0; ii < m_labelDim; ii++)
                {
                    ElemType v = (ElemType)atof(m_labelData[jRand*m_labelDim + ii].c_str());
                    m_labelsBuffer.get()[j*m_labelDim + ii] = v;
                }
                }
            }
        }

    // There may be multiple parallel trainers reading at the same time in which case
    // we will slice the data to only return the share of the current trainer's subset
    size_t currSubsetStartCol = (actualmbsize * m_subsetNum) / m_numSubsets;
    size_t currSubsetEndCol = (actualmbsize * (m_subsetNum + 1)) / m_numSubsets;
    size_t currSubsetSize = currSubsetEndCol - currSubsetStartCol;
    // create the respective MBLayout
    // Every sample is returned as a sequence of 1 frame.
    m_pMBLayout->InitAsFrameMode(currSubsetSize);

    // if we are writing out to the caching writer, do it now
    if (m_cachingWriter && (m_subsetNum == 0))
    {
        map<std::wstring, void*, nocase_compare> writeBuffer;
        writeBuffer[m_featuresName] = m_featuresBuffer.get();
        if (m_labelType == labelCategory)
        {
            writeBuffer[m_labelsName] = m_labelsIdBuffer.get();
            if (!m_labelsCategoryName.empty())
                writeBuffer[m_labelsCategoryName] = m_labelsBuffer.get();
        }
        else if (m_labelType != labelNone)
        {
            writeBuffer[m_labelsName] = m_labelsBuffer.get();
        }

        // write out the data, on a second pass compute statistics as needed
        bool moreToWrite = m_cachingWriter->SaveData(m_mbStartSample, writeBuffer, actualmbsize, m_totalSamples, 0);

        // done writing
        if (!moreToWrite)
        {
            // write out the mapping table as necessary
            if (m_labelType == labelCategory && !m_labelsMapName.empty())
            {
                m_cachingWriter->SaveMapping(m_labelsMapName, m_mapIdToLabel);
            }

            WriteLabelFile();

            // now close the cache writer
            delete m_cachingWriter;
            m_cachingWriter = NULL;
        }
    }

    // advance to the next minibatch
    m_mbStartSample += actualmbsize;

    // if they don't want partial minibatches, skip data transfer and return
    if (actualmbsize < m_mbSize && !m_partialMinibatch || actualmbsize == 0 || currSubsetSize == 0) // no records found (end of minibatch)
    {
        return false;
    }

    // now transfer to the GPU as needed
    features.SetValue(m_featureCount, currSubsetSize, features.GetDeviceId(), m_featuresBuffer.get() + (m_featureCount * currSubsetStartCol), matrixFlagNormal);
    if (m_labelType != labelNone)
    {
        if (matrices.HasInput(m_labelsName))
        {
            auto& labels = matrices.GetInputMatrix<ElemType>(m_labelsName);
            labels.SetValue(m_labelDim, currSubsetSize, labels.GetDeviceId(), m_labelsBuffer.get() + (m_labelDim * currSubsetStartCol), matrixFlagNormal);
        }
    }

    // we read some records, so process them
    return true;
}

// GetLabelMapping - Gets the label mapping from integer index to label type
// returns - a map from numeric datatype to native label type
template <class ElemType>
const std::map<IDataReader::LabelIdType, IDataReader::LabelType>& UCIFastReader<ElemType>::GetLabelMapping(const std::wstring& sectionName)
{
    if (m_cachingReader)
    {
        return m_cachingReader->GetLabelMapping(sectionName);
    }
    return m_mapIdToLabel;
}

// SetLabelMapping - Sets the label mapping from integer index to label
// labelMapping - mapping table from label values to IDs (must be 0-n)
// note: for tasks with labels, the mapping table must be the same between a training run and a testing run
template <class ElemType>
void UCIFastReader<ElemType>::SetLabelMapping(const std::wstring& /*sectionName*/, const std::map<LabelIdType, LabelType>& labelMapping)
{
    if (m_cachingReader)
    {
        RuntimeError("Cannot set mapping table when the caching reader is being used");
    }
    m_mapIdToLabel = labelMapping;
    m_mapLabelToId.clear();
    for (std::pair<unsigned, LabelType> var : labelMapping)
    {
        m_mapLabelToId[var.second] = var.first;
    }
}

// GetData - Gets metadata from the specified section (into CPU memory)
// sectionName - section name to retrieve data from
// numRecords - number of records to read
// data - pointer to data buffer, if NULL, dataBufferSize will be set to size of required buffer to accomidate request
// dataBufferSize - [in] size of the databuffer in bytes
//                  [out] size of buffer filled with data
// recordStart - record to start reading from, defaults to zero (start of data)
// returns: true if data remains to be read, false if the end of data was reached
template <class ElemType>
bool UCIFastReader<ElemType>::GetData(const std::wstring& sectionName, size_t numRecords, void* data, size_t& dataBufferSize, size_t recordStart)
{
    if (m_cachingReader)
    {
        return m_cachingReader->GetData(sectionName, numRecords, data, dataBufferSize, recordStart);
    }
    RuntimeError("GetData not supported in UCIFastReader");
}

template <class ElemType>
bool UCIFastReader<ElemType>::DataEnd()
{
    if (m_cachingReader)
        return m_cachingReader->DataEnd();
    else
        return true;
}

template <class ElemType>
unique_ptr<CUDAPageLockedMemAllocator>& UCIFastReader<ElemType>::GetCUDAAllocator(int deviceID)
{
    if (m_cudaAllocator != nullptr)
    {
        if (m_cudaAllocator->GetDeviceId() != deviceID)
        {
            m_cudaAllocator.reset(nullptr);
        }
    }

    if (m_cudaAllocator == nullptr)
    {
        m_cudaAllocator.reset(new CUDAPageLockedMemAllocator(deviceID));
    }

    return m_cudaAllocator;
}

template <class ElemType>
std::shared_ptr<ElemType> UCIFastReader<ElemType>::AllocateIntermediateBuffer(int deviceID, size_t numElements)
{
    if (deviceID >= 0)
    {
        // Use pinned memory for GPU devices for better copy performance
        size_t totalSize = sizeof(ElemType) * numElements;
        return std::shared_ptr<ElemType>((ElemType*) GetCUDAAllocator(deviceID)->Malloc(totalSize), [this, deviceID](ElemType* p)
                                         {
                                             this->GetCUDAAllocator(deviceID)->Free((char*) p);
                                         });
    }
    else
    {
        return std::shared_ptr<ElemType>(new ElemType[numElements], [](ElemType* p)
                                         {
                                             delete[] p;
                                         });
    }
}

// instantiate all the combinations we expect to be used
template class UCIFastReader<double>;
template class UCIFastReader<float>;
} } }
back to top