Revision 7103559f49b46b3287973045f741c0679e3e9e44 authored by Sagar Vemuri on 21 June 2018, 18:02:49 UTC, committed by Facebook Github Bot on 21 June 2018, 18:13:08 UTC
Summary:
This PR extends the improvements in #3282 to also work when using Direct IO.
We see **4.5X performance improvement** in seekrandom benchmark doing long range scans, when using direct reads, on flash.

**Description:**
This change improves the performance of iterators doing long range scans (e.g. big/full index or table scans in MyRocks) by using readahead and prefetching additional data on each disk IO, and storing in a local buffer. This prefetching is automatically enabled on noticing more than 2 IOs for the same table file during iteration. The readahead size starts with 8KB and is exponentially increased on each additional sequential IO, up to a max of 256 KB. This helps in cutting down the number of IOs needed to complete the range scan.

**Implementation Details:**
- Used `FilePrefetchBuffer` as the underlying buffer to store the readahead data. `FilePrefetchBuffer` can now take file_reader, readahead_size and max_readahead_size as input to the constructor, and automatically do readahead.
- `FilePrefetchBuffer::TryReadFromCache` can now call `FilePrefetchBuffer::Prefetch` if readahead is enabled.
- `AlignedBuffer` (which is the underlying store for `FilePrefetchBuffer`) now takes a few additional args in `AlignedBuffer::AllocateNewBuffer` to allow copying data from the old buffer.
- Made sure not to re-read partial chunks of data that were already available in the buffer, from device again.
- Fixed a couple of cases where `AlignedBuffer::cursize_` was not being properly kept up-to-date.

**Constraints:**
- Similar to #3282, this gets currently enabled only when ReadOptions.readahead_size = 0 (which is the default value).
- Since the prefetched data is stored in a temporary buffer allocated on heap, this could increase the memory usage if you have many iterators doing long range scans simultaneously.
- Enabled only for user reads, and disabled for compactions. Compaction reads are controlled by the options `use_direct_io_for_flush_and_compaction` and `compaction_readahead_size`, and the current feature takes precautions not to mess with them.

**Benchmarks:**
I used the same benchmark as used in #3282.
Data fill:
```
TEST_TMPDIR=/data/users/$USER/benchmarks/iter ./db_bench -benchmarks=fillrandom -num=1000000000 -compression_type="none" -level_compaction_dynamic_level_bytes
```

Do a long range scan: Seekrandom with large number of nexts
```
TEST_TMPDIR=/data/users/$USER/benchmarks/iter ./db_bench -benchmarks=seekrandom -use_direct_reads -duration=60 -num=1000000000 -use_existing_db -seek_nexts=10000 -statistics -histogram
```

```
Before:
seekrandom   :   37939.906 micros/op 26 ops/sec;   29.2 MB/s (1636 of 1999 found)
With this change:
seekrandom   :   8527.720 micros/op 117 ops/sec;  129.7 MB/s (6530 of 7999 found)
```
~4.5X perf improvement. Taken on an average of 3 runs.
Closes https://github.com/facebook/rocksdb/pull/3884

Differential Revision: D8082143

Pulled By: sagar0

fbshipit-source-id: 4d7a8561cbac03478663713df4d31ad2620253bb
1 parent 524c6e6
Raw File
error_filter.py
#  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
#  This source code is licensed under both the GPLv2 (found in the
#  COPYING file in the root directory) and Apache 2.0 License
#  (found in the LICENSE.Apache file in the root directory).

'''Filter for error messages in test output:
    - Receives merged stdout/stderr from test on stdin
    - Finds patterns of known error messages for test name (first argument)
    - Prints those error messages to stdout
'''

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import re
import sys


class ErrorParserBase(object):
    def parse_error(self, line):
        '''Parses a line of test output. If it contains an error, returns a
        formatted message describing the error; otherwise, returns None.
        Subclasses must override this method.
        '''
        raise NotImplementedError


class GTestErrorParser(ErrorParserBase):
    '''A parser that remembers the last test that began running so it can print
    that test's name upon detecting failure.
    '''
    _GTEST_NAME_PATTERN = re.compile(r'\[ RUN      \] (\S+)$')
    # format: '<filename or "unknown file">:<line #>: Failure'
    _GTEST_FAIL_PATTERN = re.compile(r'(unknown file|\S+:\d+): Failure$')

    def __init__(self):
        self._last_gtest_name = 'Unknown test'

    def parse_error(self, line):
        gtest_name_match = self._GTEST_NAME_PATTERN.match(line)
        if gtest_name_match:
            self._last_gtest_name = gtest_name_match.group(1)
            return None
        gtest_fail_match = self._GTEST_FAIL_PATTERN.match(line)
        if gtest_fail_match:
            return '%s failed: %s' % (
                    self._last_gtest_name, gtest_fail_match.group(1))
        return None


class MatchErrorParser(ErrorParserBase):
    '''A simple parser that returns the whole line if it matches the pattern.
    '''
    def __init__(self, pattern):
        self._pattern = re.compile(pattern)

    def parse_error(self, line):
        if self._pattern.match(line):
            return line
        return None


class CompilerErrorParser(MatchErrorParser):
    def __init__(self):
        # format: '<filename>:<line #>:<column #>: error: <error msg>'
        super(CompilerErrorParser, self).__init__(r'\S+:\d+:\d+: error:')


class ScanBuildErrorParser(MatchErrorParser):
    def __init__(self):
        super(ScanBuildErrorParser, self).__init__(
                r'scan-build: \d+ bugs found.$')


class DbCrashErrorParser(MatchErrorParser):
    def __init__(self):
        super(DbCrashErrorParser, self).__init__(r'\*\*\*.*\^$|TEST FAILED.')


class WriteStressErrorParser(MatchErrorParser):
    def __init__(self):
        super(WriteStressErrorParser, self).__init__(
                r'ERROR: write_stress died with exitcode=\d+')


class AsanErrorParser(MatchErrorParser):
    def __init__(self):
        super(AsanErrorParser, self).__init__(
                r'==\d+==ERROR: AddressSanitizer:')


class UbsanErrorParser(MatchErrorParser):
    def __init__(self):
        # format: '<filename>:<line #>:<column #>: runtime error: <error msg>'
        super(UbsanErrorParser, self).__init__(r'\S+:\d+:\d+: runtime error:')


class ValgrindErrorParser(MatchErrorParser):
    def __init__(self):
        # just grab the summary, valgrind doesn't clearly distinguish errors
        # from other log messages.
        super(ValgrindErrorParser, self).__init__(r'==\d+== ERROR SUMMARY:')


class CompatErrorParser(MatchErrorParser):
    def __init__(self):
        super(CompatErrorParser, self).__init__(r'==== .*[Ee]rror.* ====$')


class TsanErrorParser(MatchErrorParser):
    def __init__(self):
        super(TsanErrorParser, self).__init__(r'WARNING: ThreadSanitizer:')


_TEST_NAME_TO_PARSERS = {
    'punit': [CompilerErrorParser, GTestErrorParser],
    'unit': [CompilerErrorParser, GTestErrorParser],
    'release': [CompilerErrorParser, GTestErrorParser],
    'unit_481': [CompilerErrorParser, GTestErrorParser],
    'release_481': [CompilerErrorParser, GTestErrorParser],
    'clang_unit': [CompilerErrorParser, GTestErrorParser],
    'clang_release': [CompilerErrorParser, GTestErrorParser],
    'clang_analyze': [CompilerErrorParser, ScanBuildErrorParser],
    'code_cov': [CompilerErrorParser, GTestErrorParser],
    'unity': [CompilerErrorParser, GTestErrorParser],
    'lite': [CompilerErrorParser],
    'lite_test': [CompilerErrorParser, GTestErrorParser],
    'stress_crash': [CompilerErrorParser, DbCrashErrorParser],
    'write_stress': [CompilerErrorParser, WriteStressErrorParser],
    'asan': [CompilerErrorParser, GTestErrorParser, AsanErrorParser],
    'asan_crash': [CompilerErrorParser, AsanErrorParser, DbCrashErrorParser],
    'ubsan': [CompilerErrorParser, GTestErrorParser, UbsanErrorParser],
    'ubsan_crash': [CompilerErrorParser, UbsanErrorParser, DbCrashErrorParser],
    'valgrind': [CompilerErrorParser, GTestErrorParser, ValgrindErrorParser],
    'tsan': [CompilerErrorParser, GTestErrorParser, TsanErrorParser],
    'format_compatible': [CompilerErrorParser, CompatErrorParser],
    'run_format_compatible': [CompilerErrorParser, CompatErrorParser],
    'no_compression': [CompilerErrorParser, GTestErrorParser],
    'run_no_compression': [CompilerErrorParser, GTestErrorParser],
    'regression': [CompilerErrorParser],
    'run_regression': [CompilerErrorParser],
}


def main():
    if len(sys.argv) != 2:
        return 'Usage: %s <test name>' % sys.argv[0]
    test_name = sys.argv[1]
    if test_name not in _TEST_NAME_TO_PARSERS:
        return 'Unknown test name: %s' % test_name

    error_parsers = []
    for parser_cls in _TEST_NAME_TO_PARSERS[test_name]:
        error_parsers.append(parser_cls())

    for line in sys.stdin:
        line = line.strip()
        for error_parser in error_parsers:
            error_msg = error_parser.parse_error(line)
            if error_msg is not None:
                print(error_msg)


if __name__ == '__main__':
    sys.exit(main())
back to top