swh:1:snp:f50ab94432af916b5fb8b4ad831e8dddded77084
Raw File
Tip revision: eb43ecd1f6b53d0b567f6dab21e1a112de90593b authored by Tim Dettmers on 29 June 2016, 21:31:00 UTC
Merge branch 't-tidett/ExtendedCUDAErrors' of https://github.com/Microsoft/CNTK into t-tidett/ExtendedCUDAErrors
Tip revision: eb43ecd
Indexer.cpp
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

#include "stdafx.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "Indexer.h"
#include "TextReaderConstants.h"

using std::string;

namespace Microsoft { namespace MSR { namespace CNTK {

Indexer::Indexer(FILE* file, bool skipSequenceIds, size_t chunkSize) :
    m_file(file),
    m_fileOffsetStart(0),
    m_fileOffsetEnd(0),
    m_buffer(new char[BUFFER_SIZE + 1]),
    m_bufferStart(nullptr),
    m_bufferEnd(nullptr),
    m_pos(nullptr),
    m_done(false),
    m_hasSequenceIds(!skipSequenceIds),
    m_index(chunkSize)
{
    if (m_file == nullptr)
    {
        RuntimeError("Input file not open for reading");
    }
}

void Indexer::RefillBuffer()
{
    if (!m_done)
    {
        size_t bytesRead = fread(m_buffer.get(), 1, BUFFER_SIZE, m_file);
        if (bytesRead == (size_t)-1)
            RuntimeError("Could not read from the input file.");
        if (bytesRead == 0)
        {
            m_done = true;
        }
        else
        {
            m_fileOffsetStart = m_fileOffsetEnd;
            m_fileOffsetEnd += bytesRead;
            m_bufferStart = m_buffer.get();
            m_pos = m_bufferStart;
            m_bufferEnd = m_bufferStart + bytesRead;
        }
    }
}

void Indexer::BuildFromLines(CorpusDescriptorPtr corpus)
{
    assert(m_pos == m_bufferStart);
    m_hasSequenceIds = false;
    size_t lines = 0;
    int64_t offset = GetFileOffset();
    while (!m_done)
    {
        m_pos = (char*)memchr(m_pos, ROW_DELIMITER, m_bufferEnd - m_pos);
        if (m_pos)
        {
            SequenceDescriptor sd = {};
            sd.m_numberOfSamples = 1;
            sd.m_fileOffsetBytes = offset;
            offset = GetFileOffset() + 1;
            sd.m_byteSize = offset - sd.m_fileOffsetBytes;
            AddSequenceIfIncluded(corpus, lines, sd);
            ++m_pos;
            ++lines;
        }
        else
        {
            RefillBuffer();
        }
    }

    if (offset < m_fileOffsetEnd)
    {
        // There's a number of characters, not terminated by a newline,
        // add a sequence to the index, parser will have to deal with it.
        SequenceDescriptor sd = {};
        sd.m_numberOfSamples = 1;
        sd.m_fileOffsetBytes = offset;
        sd.m_byteSize = m_fileOffsetEnd - sd.m_fileOffsetBytes;
        AddSequenceIfIncluded(corpus, lines, sd);
    }
}

void Indexer::Build(CorpusDescriptorPtr corpus)
{
    if (!m_index.IsEmpty())
    {
        return;
    }

    m_index.Reserve(filesize(m_file));

    RefillBuffer(); // read the first block of data
    if (m_done)
    {
        RuntimeError("Input file is empty");
    }

    if ((m_bufferEnd - m_bufferStart > 3) &&
        (m_bufferStart[0] == '\xEF' && m_bufferStart[1] == '\xBB' && m_bufferStart[2] == '\xBF'))
    {
        // input file contains UTF-8 BOM value, skip it.
        m_pos += 3;
        m_fileOffsetStart += 3;
        m_bufferStart += 3;
    }

    // check the first byte and decide what to do next
    if (!m_hasSequenceIds || m_bufferStart[0] == NAME_PREFIX)
    {
        // skip sequence id parsing, treat lines as individual sequences
        BuildFromLines(corpus);
        return;
    }

    size_t id = 0;
    int64_t offset = GetFileOffset();
    // read the very first sequence id
    if (!TryGetSequenceId(id))
    {
        RuntimeError("Expected a sequence id at the offset %" PRIi64 ", none was found.", offset);
    }

    SequenceDescriptor sd = {};
    sd.m_fileOffsetBytes = offset;

    size_t currentKey = id;
    while (!m_done)
    {
        SkipLine(); // ignore whatever is left on this line.
        offset = GetFileOffset(); // a new line starts at this offset;
        sd.m_numberOfSamples++;

        if (!m_done && TryGetSequenceId(id) && id != currentKey)
        {
            // found a new sequence, which starts at the [offset] bytes into the file
            sd.m_byteSize = offset - sd.m_fileOffsetBytes;
            AddSequenceIfIncluded(corpus, currentKey, sd);

            sd = {};
            sd.m_fileOffsetBytes = offset;
            currentKey = id;
        }
    }

    // calculate the byte size for the last sequence
    sd.m_byteSize = m_fileOffsetEnd - sd.m_fileOffsetBytes;
    AddSequenceIfIncluded(corpus, currentKey, sd);
}

void Indexer::AddSequenceIfIncluded(CorpusDescriptorPtr corpus, size_t sequenceKey, SequenceDescriptor& sd)
{
    auto& stringRegistry = corpus->GetStringRegistry();
    auto key = std::to_string(sequenceKey);
    if (corpus->IsIncluded(key))
    {
        sd.m_key.m_sequence = stringRegistry[key];
        sd.m_key.m_sample = 0;
        m_index.AddSequence(sd);
    }
}

void Indexer::SkipLine()
{
    while (!m_done)
    {
        m_pos = (char*)memchr(m_pos, ROW_DELIMITER, m_bufferEnd - m_pos);
        if (m_pos)
        {
            //found a new-line character
            if (++m_pos == m_bufferEnd)
            {
                RefillBuffer();
            }
            return;
        }
        RefillBuffer();
    }
}

bool Indexer::TryGetSequenceId(size_t& id)
{
    bool found = false;
    id = 0;
    while (!m_done)
    {
        while (m_pos != m_bufferEnd)
        {
            char c = *m_pos;

            if (!isdigit(c))
            {
                // Stop as soon as there's a non-digit character
                return found;
            }

            found |= true;
            id = id * 10 + (c - '0');
            ++m_pos;
        }
        RefillBuffer();
    }

    // reached EOF without hitting the pipe character,
    // ignore it for not, parser will have to deal with it.
    return false;
}

}}}
back to top