Raw File
PackerBase.h
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

#pragma once

#include "Reader.h"
#include "MemoryProvider.h"
#include "SequenceEnumerator.h"
#include "Packer.h"
#include "CorpusDescriptor.h"

namespace CNTK {

// A base class for Packers.
class PackerBase : public Packer
{
protected:

    struct StreamBuffer
    {
        size_t m_size; // buffer size in bytes.
        MemoryProviderPtr m_memoryProvider;
        std::shared_ptr<char> m_data; // contiguous array of data.

        StreamBuffer(MemoryProviderPtr m_memoryProvider) :
            m_size(0), m_memoryProvider(m_memoryProvider), m_data(nullptr)
        {
        }

        void Resize(size_t newSize);
    };

    PackerBase(CorpusDescriptorPtr corpus,
               SequenceEnumeratorPtr sequenceEnumerator,
               const std::vector<StreamInformation>& streams,
               size_t numberOfBuffers);

    typedef std::vector<SequenceDataPtr> StreamBatch;

    size_t GetSampleSize(const StreamInformation& stream);

    std::vector<StreamInformation> GetStreamDescriptions() override
    {
        return m_outputStreamDescriptions;
    }

    // Packs a sparse sample as dense:
    //  - 0-fills a region of sampleSize bytes in the block of memory pointed to by destination;
    //  - copies non-zero values of the required sample (given by sampleIndex) from the data
    // portion of the source sequence to the destination block of memory, where each value is placed
    // at the offset equal to value index * elementSize. sampleOffset specifies the offset of the
    // first value from the given sample in the sequence data/indices array (sampleOffset is equal
    // to the sum of non-zero value counts of all preceding samples).
    void PackSparseSampleAsDense(char* destination, SparseSequenceDataPtr sequence,
        size_t sampleIndex, size_t sampleOffset, size_t sampleSize, size_t elementSize);

    // Packs a dense sample as dense. Copies sampleSize bytes staring at the sampleOffset from 
    // the data portion of the source sequence to the destination block of memory. sampleOffset 
    // specifies the offset of the first value from the given sample in the sequence data/ array 
    // (sampleOffset is equal to the sum of sample sizes of all preceding samples).
    void PackDenseSample(char* destination, SequenceDataPtr sequence, size_t sampleOffset, size_t sampleSize);

    // Establishes a mapping between id inside the mb layout and the global key in the corpus.
    // Assumes the sequences inside MBLayout have the same order as Sequences.
    void EstablishIdToKey(Minibatch& minibatch, const Sequences& sequences);

    static void CheckNameUniqueness(const std::vector<StreamInformation>& streams);

    SequenceEnumeratorPtr m_sequenceEnumerator;

    // Input stream descriptions provided by the transformer.
    std::vector<StreamInformation> m_outputStreamDescriptions;

    // Output stream descriptions expected by the network.
    std::vector<StreamInformation> m_inputStreamDescriptions;

    // Indicates how many internal buffers with pinned memory are supported.
    // If N - then N sequential calls to PackMinibatch are valid, and N+1 call will overwrite 
    // the memory of the first call.
    size_t m_numberOfBuffers;

    // Buffers for allocated data. Outer vector size == m_numberOfBuffers, 
    // inner vector contains buffers for all streams.
    std::vector<std::vector<StreamBuffer>> m_streamBuffers;

    // Cyclic index of the current buffer. m_currentBufferIndex < m_numberOfBuffers;
    size_t m_currentBufferIndex;

    // For which streams there should be a shape check for each sequence.
    std::vector<bool> m_checkSampleShape;

    // Memory providers. Each stream has its own memory provider.
    std::vector<MemoryProviderPtr> m_memoryProviders;

    // Current config.
    ReaderConfiguration m_config;

    CorpusDescriptorPtr m_corpus;

public:
    // Sets current epoch configuration.
    virtual void SetConfiguration(const ReaderConfiguration& config, const std::vector<MemoryProviderPtr>& memoryProviders) override;
};

inline void PackerBase::PackSparseSampleAsDense(char* destination, SparseSequenceDataPtr sequence,
    size_t sampleIndex, size_t sampleOffset, size_t sampleSize, size_t elementSize)
{
    //The sample is sparse, first, need to zero out the buffer.
    memset(destination, 0, sampleSize);
    // Get the nnz count of the sample.
    size_t nonZeroCount = sequence->m_nnzCounts[sampleIndex];
    // In a sparse sequence, m_data points to the array of non zero elements,
    // m_indices stores the corresponding indices for each element. 
    // Iterate through non zero elements and copy from m_data them into the 
    // destination at the offset given by the corresponding row index (m_index).
    const void* buffer = sequence->GetDataBuffer();
    for (size_t nonZeroIndex = 0; nonZeroIndex < nonZeroCount; ++nonZeroIndex)
    {
        auto sourceOffset = sampleOffset + nonZeroIndex;
        auto elementIndex = sequence->m_indices[sourceOffset];
        auto destinationOffset = elementIndex * elementSize;
        assert(destinationOffset < sampleSize);
        const auto* source = (const char*)buffer + (sourceOffset)* elementSize;
        memcpy(destination + destinationOffset, source, elementSize);
    }
}

inline void PackerBase::PackDenseSample(char* destination, SequenceDataPtr sequence, size_t sampleOffset, size_t sampleSize)
{
    // Because the sample is dense - simply copying it to the output.
    memcpy(destination, (const char*)(sequence->GetDataBuffer()) + sampleOffset, sampleSize);
}

}
back to top