https://github.com/Microsoft/CNTK
Raw File
Tip revision: ad1f8bc52085abdc4a6cfe14d508ad3945698649 authored by Bowen Bao on 31 July 2018, 22:03:09 UTC
Update conv input space filled by kernel check.
Tip revision: ad1f8bc
LibSVMBinaryReader.h
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// LibSVMBinaryReader.h - Include file for the MTK and MLF format of features and samples
//
#pragma once
#include "stdafx.h"
#include "DataReader.h"
#include "DataWriter.h"
#include "RandomOrdering.h"
#include <string>
#include <map>
#include <vector>
#include <random>
#include <future>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <thread>
#if DEBUG
#include <cvmarkersobj.h>
using namespace Concurrency::diagnostic;
#endif

namespace Microsoft { namespace MSR { namespace CNTK {

template <typename T>
class BlockingQueue
{
private:
    std::mutex d_mutex;
    std::condition_variable d_condition;
    std::deque<T> d_queue;

public:
    void release()
    {
        while (!d_queue.empty())
            free(d_queue.pop_front());
    }
    size_t size()
    {
        return d_queue.size();
    }
    void push(T const& value)
    {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop()
    {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]
                               {
                                   return !this->d_queue.empty();
                               });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

template <class ElemType>
class BinaryMatrix
{
public:
    BinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols)
        : m_matrixName(name), m_deviceID(deviceID), m_maxNumRows(numRows), m_numRows(0), m_maxNumCols(numCols), m_values(nullptr){};
    // BinaryMatrix(wstring name, size_t numRows, size_t numCols) : m_matrixName(name), m_maxNumRows(numRows), m_numRows(0), m_maxNumCols(numCols), m_values(nullptr) {};
    virtual void Clear() = 0;
    virtual void Dispose() = 0;
    virtual void Fill(Matrix<ElemType>*) = 0;
    virtual void AddValues(void*, size_t) = 0;
    virtual void AddColIndices(void*, size_t) = 0;
    virtual void AddRowIndices(void*, size_t) = 0;
    virtual void UpdateNNz(size_t) = 0;
    virtual void UpdateCurMB(size_t mb)
    {
        m_numRows += mb;
    }
    virtual void ResizeArrays(size_t) = 0;
    virtual void SetMaxRows(size_t maxRows) = 0;

protected:
    wstring m_matrixName;
    int m_deviceID;
    std::shared_ptr<ElemType> m_values;
    size_t m_maxNumRows;
    size_t m_maxNumCols;

    size_t m_numRows;
};

template <class ElemType>
class DenseBinaryMatrix : public BinaryMatrix<ElemType>
{
public:
    DenseBinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols);
    // DenseBinaryMatrix(wstring name, size_t numRows, size_t numCols);
    virtual void Clear();
    virtual void Dispose();
    virtual void Fill(Matrix<ElemType>* matrix) override;
    virtual void AddValues(void* values, size_t numRows) override;
    virtual void AddColIndices(void* /*colIndices*/, size_t /*numCols*/) override
    {
        NOT_IMPLEMENTED
    }
    virtual void AddRowIndices(void* /*rowIndices*/, size_t /*nnz*/) override
    {
        NOT_IMPLEMENTED
    }
    virtual void UpdateNNz(size_t /*nnz*/) override
    {
        NOT_IMPLEMENTED
    }
    virtual void ResizeArrays(size_t)
    {
        NOT_IMPLEMENTED
    }
    virtual void SetMaxRows(size_t maxRows) override;

protected:
};

template <class ElemType>
class SparseBinaryMatrix : public BinaryMatrix<ElemType>
{
    typedef BinaryMatrix<ElemType> Base;
    using Base::m_values; using Base::m_numRows; using Base::m_deviceID;
public:
    SparseBinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols);
    // SparseBinaryMatrix(wstring name, size_t numRows, size_t numCols);
    virtual void Clear();
    virtual void Dispose();
    virtual void Fill(Matrix<ElemType>* matrix) override;
    virtual void AddValues(void* values, size_t nnz) override;
    virtual void AddColIndices(void* colIndices, size_t numCols) override;
    virtual void AddRowIndices(void* rowIndices, size_t nnz) override;
    virtual void UpdateNNz(size_t nnz) override
    {
        m_nnz += nnz;
    }
    virtual void ResizeArrays(size_t newMaxNNz) override;
    virtual void SetMaxRows(size_t maxRows) override;

protected:
    std::shared_ptr<int32_t> m_rowIndices;
    std::shared_ptr<int32_t> m_colIndices;
    size_t m_nnz;
    size_t m_maxNNz;
};

template <class ElemType>
class SparseBinaryInput
{
public:
    SparseBinaryInput(std::wstring fileName);
    ~SparseBinaryInput();
    void Init(std::map<std::wstring, std::wstring> rename);
    void StartDistributedMinibatchLoop(size_t mbSize, size_t subsetNum, size_t numSubsets);
    void ReadMinibatches(size_t* read_order, size_t numToRead);
    size_t ReadMinibatch(void* data_buffer, std::map<std::wstring, shared_ptr<BinaryMatrix<ElemType>>>& matrices);
    size_t FillMatrices(std::map<std::wstring, shared_ptr<BinaryMatrix<ElemType>>>& matrices);
    size_t GetMBSize()
    {
        return m_mbSize;
    }
    size_t GetNumMB()
    {
        return m_numBatches / (m_mbSize / m_microBatchSize);
    }
    void Shuffle();
    shared_ptr<BinaryMatrix<ElemType>> CreateMatrix(std::wstring matName, int deviceId);
    // shared_ptr<BinaryMatrix<ElemType>> CreateMatrix(std::wstring matName);
    virtual bool DataEnd();

private:
    void ReadOffsets(size_t startMB, size_t numMBs);
    void FillReadOrder(size_t windowSize);
    void* GetTempDataPointer(size_t numVals);
    bool Randomize();

    ifstream m_inFile;
    std::wstring m_fileName;
    size_t m_fileSize;

    size_t m_offsetsStart;
    int64_t* m_offsets;
    size_t m_dataStart;

    size_t m_nextMB;    // starting sample # of the next minibatch
    size_t m_epochSize; // size of an epoch

    size_t m_numRows;    // size of minibatch requested
    size_t m_numBatches; // size of minibatch requested

    int32_t m_microBatchSize;
    size_t m_mbSize;

    size_t m_startMB;
    size_t m_endMB;
    size_t m_curLower;

    size_t m_subsetNum;
    size_t m_numSubsets;

    size_t m_windowSize;
    size_t m_curWindowSize;

    bool m_randomize;
    size_t* m_readOrder; // array to shuffle to reorder the dataset
    size_t m_readOrderLength;
    size_t m_maxMBSize;

    std::vector<std::wstring> m_features;
    std::vector<std::wstring> m_labels;
    std::map<std::wstring, int32_t> m_mappedNumCols;

    int32_t m_tempValuesSize;
    void* m_tempValues;

    RandomOrdering m_randomordering; // randomizing class
    std::mt19937_64 m_randomEngine;
#ifdef _WIN32
    DWORD sysGran;
#else
    int32_t sysGran;
#endif
    BlockingQueue<void*> m_dataToProduce;
    BlockingQueue<void*> m_dataToConsume;
};

template <class ElemType>
class LibSVMBinaryReader : public DataReaderBase
{
public:
    virtual void Init(const ConfigParameters& config) override
    {
        InitFromConfig(config);
    }
    virtual void Init(const ScriptableObjects::IConfigRecord& config) override
    {
        InitFromConfig(config);
    }

    template <class ConfigRecordType>
    void InitFromConfig(const ConfigRecordType&);

    virtual void Destroy();

    LibSVMBinaryReader()
        : m_dssmLabels(nullptr), DSSMCols(0)
    {
        m_pMBLayout = make_shared<MBLayout>();
        m_pMBLayout->SetUniqueAxisName(L"LibSVMReader");
    };

    virtual ~LibSVMBinaryReader();

    virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples = requestDataSize);
    virtual void StartDistributedMinibatchLoop(size_t mbSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples) override;
    virtual bool TryGetMinibatch(StreamMinibatchInputs& matrices);

    virtual bool SupportsDistributedMBRead() const override
    {
        return true;
    }

    template <class ConfigRecordType>
    void RenamedMatrices(const ConfigRecordType& readerConfig, std::map<std::wstring, std::wstring>& rename);
    virtual void SetLabelMapping(const std::wstring& /*sectionName*/, const std::map<LabelIdType, LabelType>& /*labelMapping*/){NOT_IMPLEMENTED};
    virtual bool GetData(const std::wstring& /*sectionName*/, size_t /*numRecords*/, void* /*data*/, size_t& /*dataBufferSize*/, size_t /*recordStart = 0*/){NOT_IMPLEMENTED};
    virtual bool DataEnd();

    size_t GetNumParallelSequencesForFixingBPTTMode()
    {
        return m_pMBLayout->GetNumParallelSequences();
    }
    void CopyMBLayoutTo(MBLayoutPtr pMBLayout)
    {
        pMBLayout->CopyFrom(m_pMBLayout);
    };

    // virtual bool DataEnd();

    size_t NumberSlicesInEachRecurrentIter()
    {
        return 1;
    }
    void SetNbrSlicesEachRecurrentIter(const size_t){};
    void SetSentenceEndInBatch(std::vector<size_t>& /*sentenceEnd*/){};

private:
#if DEBUG
    marker_series* reader_series;
    size_t cur_read;
#endif
    clock_t timer;
    void DoDSSMMatrix(Matrix<ElemType>& mat, size_t actualMBSize);

    void CheckDataMatrices(StreamMinibatchInputs& matrices);
    MBLayoutPtr m_pMBLayout;
    ConfigParameters m_readerConfig;

    std::shared_ptr<SparseBinaryInput<ElemType>> m_dataInput;

    std::map<std::wstring, shared_ptr<BinaryMatrix<ElemType>>> m_dataMatrices;

    unsigned long m_randomize; // randomization range

    std::shared_ptr<ElemType> m_dssmLabels;
    size_t DSSMCols;

    size_t m_mbSize; // size of minibatch requested

    size_t m_requestedEpochSize; // size of an epoch

    size_t m_epoch; // which epoch are we on

    bool m_partialMinibatch; // a partial minibatch is allowed

    bool m_prefetchEnabled;
    std::future<size_t> m_pendingAsyncGetMinibatch;
};
} } }
back to top