Revision 239d17a19c3cec16937aa4b6c56c90f4f217addf authored by Peter Dillinger on 18 December 2020, 22:29:48 UTC, committed by Facebook GitHub Bot on 18 December 2020, 22:31:03 UTC
Summary:
Primarily this change refactors the optimize_filters_for_memory
code for Bloom filters, based on malloc_usable_size, to also work for
Ribbon filters.

This change also replaces the somewhat slow but general
BuiltinFilterBitsBuilder::ApproximateNumEntries with
implementation-specific versions for Ribbon (new) and Legacy Bloom
(based on a recently deleted version). The reason is to emphasize
speed in ApproximateNumEntries rather than 100% accuracy.

Justification: ApproximateNumEntries (formerly CalculateNumEntry) is
only used by RocksDB for range-partitioned filters, called each time we
start to construct one. (In theory, it should be possible to reuse the
estimate, but the abstractions provided by FilterPolicy don't really
make that workable.) But this is only used as a heuristic estimate for
hitting a desired partitioned filter size because of alignment to data
blocks, which have various numbers of unique keys or prefixes. The two
factors lead us to prioritize reasonable speed over 100% accuracy.

optimize_filters_for_memory adds extra complication, because precisely
calculating num_entries for some allowed number of bytes depends on state
with optimize_filters_for_memory enabled. And the allocator-agnostic
implementation of optimize_filters_for_memory, using malloc_usable_size,
means we would have to actually allocate memory, many times, just to
precisely determine how many entries (keys) could be added and stay below
some size budget, for the current state. (In a draft, I got this
working, and then realized the balance of speed vs. accuracy was all
wrong.)

So related to that, I have made CalculateSpace, an internal-only API
only used for testing, non-authoritative also if
optimize_filters_for_memory is enabled. This simplifies some code.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7774

Test Plan:
unit test updated, and for FilterSize test, range of tested
values is greatly expanded (still super fast)

Also tested `db_bench -benchmarks=fillrandom,stats -bloom_bits=10 -num=1000000 -partition_index_and_filters -format_version=5 [-optimize_filters_for_memory] [-use_ribbon_filter]` with temporary debug output of generated filter sizes.

Bloom+optimize_filters_for_memory:

      1 Filter size: 197 (224 in memory)
    134 Filter size: 3525 (3584 in memory)
    107 Filter size: 4037 (4096 in memory)
    Total on disk: 904,506
    Total in memory: 918,752

Ribbon+optimize_filters_for_memory:

      1 Filter size: 3061 (3072 in memory)
    110 Filter size: 3573 (3584 in memory)
     58 Filter size: 4085 (4096 in memory)
    Total on disk: 633,021 (-30.0%)
    Total in memory: 634,880 (-30.9%)

Bloom (no offm):

      1 Filter size: 261 (320 in memory)
      1 Filter size: 3333 (3584 in memory)
    240 Filter size: 3717 (4096 in memory)
    Total on disk: 895,674 (-1% on disk vs. +offm; known tolerable overhead of offm)
    Total in memory: 986,944 (+7.4% vs. +offm)

Ribbon (no offm):

      1 Filter size: 2949 (3072 in memory)
      1 Filter size: 3381 (3584 in memory)
    167 Filter size: 3701 (4096 in memory)
    Total on disk: 624,397 (-30.3% vs. Bloom)
    Total in memory: 690,688 (-30.0% vs. Bloom)

Note that optimize_filters_for_memory is even more effective for Ribbon filter than for cache-local Bloom, because it can close the unused memory gap even tighter than Bloom filter, because of 16 byte increments for Ribbon vs. 64 byte increments for Bloom.

Reviewed By: jay-zhuang

Differential Revision: D25592970

Pulled By: pdillinger

fbshipit-source-id: 606fdaa025bb790d7e9c21601e8ea86e10541912
1 parent 04b3524
Raw File
block_fetcher_test.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#include "table/block_fetcher.h"

#include "db/table_properties_collector.h"
#include "env/composite_env_wrapper.h"
#include "file/file_util.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "table/block_based/binary_search_index_reader.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/format.h"
#include "test_util/testharness.h"

namespace ROCKSDB_NAMESPACE {
namespace {

class CountedMemoryAllocator : public MemoryAllocator {
 public:
  const char* Name() const override { return "CountedMemoryAllocator"; }

  void* Allocate(size_t size) override {
    num_allocations_++;
    return static_cast<void*>(new char[size]);
  }

  void Deallocate(void* p) override {
    num_deallocations_++;
    delete[] static_cast<char*>(p);
  }

  int GetNumAllocations() const { return num_allocations_; }
  int GetNumDeallocations() const { return num_deallocations_; }

 private:
  int num_allocations_ = 0;
  int num_deallocations_ = 0;
};

struct MemcpyStats {
  int num_stack_buf_memcpy;
  int num_heap_buf_memcpy;
  int num_compressed_buf_memcpy;
};

struct BufAllocationStats {
  int num_heap_buf_allocations;
  int num_compressed_buf_allocations;
};

struct TestStats {
  MemcpyStats memcpy_stats;
  BufAllocationStats buf_allocation_stats;
};

class BlockFetcherTest : public testing::Test {
 public:
  enum class Mode {
    kBufferedRead = 0,
    kBufferedMmap,
    kDirectRead,
    kNumModes,
  };
  // use NumModes as array size to avoid "size of array '...' has non-integral
  // type" errors.
  const static int NumModes = static_cast<int>(Mode::kNumModes);

 protected:
  void SetUp() override {
    SetupSyncPointsToMockDirectIO();
    test_dir_ = test::PerThreadDBPath("block_fetcher_test");
    env_ = Env::Default();
    fs_ = FileSystem::Default();
    ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
  }

  void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }

  void AssertSameBlock(const std::string& block1, const std::string& block2) {
    ASSERT_EQ(block1, block2);
  }

  // Creates a table with kv pairs (i, i) where i ranges from 0 to 9, inclusive.
  void CreateTable(const std::string& table_name,
                   const CompressionType& compression_type) {
    std::unique_ptr<WritableFileWriter> writer;
    NewFileWriter(table_name, &writer);

    // Create table builder.
    ImmutableCFOptions ioptions(options_);
    InternalKeyComparator comparator(options_.comparator);
    ColumnFamilyOptions cf_options(options_);
    MutableCFOptions moptions(cf_options);
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
    std::unique_ptr<TableBuilder> table_builder(table_factory_.NewTableBuilder(
        TableBuilderOptions(ioptions, moptions, comparator, &factories,
                            compression_type, 0 /* sample_for_compression */,
                            CompressionOptions(), false /* skip_filters */,
                            kDefaultColumnFamilyName, -1 /* level */),
        0 /* column_family_id */, writer.get()));

    // Build table.
    for (int i = 0; i < 9; i++) {
      std::string key = ToInternalKey(std::to_string(i));
      std::string value = std::to_string(i);
      table_builder->Add(key, value);
    }
    ASSERT_OK(table_builder->Finish());
  }

  void FetchIndexBlock(const std::string& table_name,
                       CountedMemoryAllocator* heap_buf_allocator,
                       CountedMemoryAllocator* compressed_buf_allocator,
                       MemcpyStats* memcpy_stats, BlockContents* index_block,
                       std::string* result) {
    FileOptions fopt(options_);
    std::unique_ptr<RandomAccessFileReader> file;
    NewFileReader(table_name, fopt, &file);

    // Get handle of the index block.
    Footer footer;
    ReadFooter(file.get(), &footer);
    const BlockHandle& index_handle = footer.index_handle();

    CompressionType compression_type;
    FetchBlock(file.get(), index_handle, BlockType::kIndex,
               false /* compressed */, false /* do_uncompress */,
               heap_buf_allocator, compressed_buf_allocator, index_block,
               memcpy_stats, &compression_type);
    ASSERT_EQ(compression_type, CompressionType::kNoCompression);
    result->assign(index_block->data.ToString());
  }

  // Fetches the first data block in both direct IO and non-direct IO mode.
  //
  // compressed: whether the data blocks are compressed;
  // do_uncompress: whether the data blocks should be uncompressed on fetching.
  // compression_type: the expected compression type.
  //
  // Expects:
  // Block contents are the same.
  // Bufferr allocation and memory copy statistics are expected.
  void TestFetchDataBlock(
      const std::string& table_name_prefix, bool compressed, bool do_uncompress,
      std::array<TestStats, NumModes> expected_stats_by_mode) {
    for (CompressionType compression_type : GetSupportedCompressions()) {
      bool do_compress = compression_type != kNoCompression;
      if (compressed != do_compress) continue;
      std::string compression_type_str =
          CompressionTypeToString(compression_type);

      std::string table_name = table_name_prefix + compression_type_str;
      CreateTable(table_name, compression_type);

      CompressionType expected_compression_type_after_fetch =
          (compressed && !do_uncompress) ? compression_type : kNoCompression;

      BlockContents blocks[NumModes];
      std::string block_datas[NumModes];
      MemcpyStats memcpy_stats[NumModes];
      CountedMemoryAllocator heap_buf_allocators[NumModes];
      CountedMemoryAllocator compressed_buf_allocators[NumModes];
      for (int i = 0; i < NumModes; ++i) {
        SetMode(static_cast<Mode>(i));
        FetchFirstDataBlock(table_name, compressed, do_uncompress,
                            expected_compression_type_after_fetch,
                            &heap_buf_allocators[i],
                            &compressed_buf_allocators[i], &blocks[i],
                            &block_datas[i], &memcpy_stats[i]);
      }

      for (int i = 0; i < NumModes - 1; ++i) {
        AssertSameBlock(block_datas[i], block_datas[i + 1]);
      }

      // Check memcpy and buffer allocation statistics.
      for (int i = 0; i < NumModes; ++i) {
        const TestStats& expected_stats = expected_stats_by_mode[i];

        ASSERT_EQ(memcpy_stats[i].num_stack_buf_memcpy,
                  expected_stats.memcpy_stats.num_stack_buf_memcpy);
        ASSERT_EQ(memcpy_stats[i].num_heap_buf_memcpy,
                  expected_stats.memcpy_stats.num_heap_buf_memcpy);
        ASSERT_EQ(memcpy_stats[i].num_compressed_buf_memcpy,
                  expected_stats.memcpy_stats.num_compressed_buf_memcpy);

        ASSERT_EQ(heap_buf_allocators[i].GetNumAllocations(),
                  expected_stats.buf_allocation_stats.num_heap_buf_allocations);
        ASSERT_EQ(
            compressed_buf_allocators[i].GetNumAllocations(),
            expected_stats.buf_allocation_stats.num_compressed_buf_allocations);

        // The allocated buffers are not deallocated until
        // the block content is deleted.
        ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(), 0);
        ASSERT_EQ(compressed_buf_allocators[i].GetNumDeallocations(), 0);
        blocks[i].allocation.reset();
        ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(),
                  expected_stats.buf_allocation_stats.num_heap_buf_allocations);
        ASSERT_EQ(
            compressed_buf_allocators[i].GetNumDeallocations(),
            expected_stats.buf_allocation_stats.num_compressed_buf_allocations);
      }
    }
  }

  void SetMode(Mode mode) {
    switch (mode) {
      case Mode::kBufferedRead:
        options_.use_direct_reads = false;
        options_.allow_mmap_reads = false;
        break;
      case Mode::kBufferedMmap:
        options_.use_direct_reads = false;
        options_.allow_mmap_reads = true;
        break;
      case Mode::kDirectRead:
        options_.use_direct_reads = true;
        options_.allow_mmap_reads = false;
        break;
      case Mode::kNumModes:
        assert(false);
    }
  }

 private:
  std::string test_dir_;
  Env* env_;
  std::shared_ptr<FileSystem> fs_;
  BlockBasedTableFactory table_factory_;
  Options options_;

  std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }

  void WriteToFile(const std::string& content, const std::string& filename) {
    std::unique_ptr<FSWritableFile> f;
    ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
    ASSERT_OK(f->Append(content, IOOptions(), nullptr));
    ASSERT_OK(f->Close(IOOptions(), nullptr));
  }

  void NewFileWriter(const std::string& filename,
                     std::unique_ptr<WritableFileWriter>* writer) {
    std::string path = Path(filename);
    EnvOptions env_options;
    std::unique_ptr<WritableFile> file;
    ASSERT_OK(env_->NewWritableFile(path, &file, env_options));
    writer->reset(new WritableFileWriter(
        NewLegacyWritableFileWrapper(std::move(file)), path, env_options));
  }

  void NewFileReader(const std::string& filename, const FileOptions& opt,
                     std::unique_ptr<RandomAccessFileReader>* reader) {
    std::string path = Path(filename);
    std::unique_ptr<FSRandomAccessFile> f;
    ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
    reader->reset(new RandomAccessFileReader(std::move(f), path, env_));
  }

  void NewTableReader(const ImmutableCFOptions& ioptions,
                      const FileOptions& foptions,
                      const InternalKeyComparator& comparator,
                      const std::string& table_name,
                      std::unique_ptr<BlockBasedTable>* table) {
    std::unique_ptr<RandomAccessFileReader> file;
    NewFileReader(table_name, foptions, &file);

    uint64_t file_size = 0;
    ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));

    std::unique_ptr<TableReader> table_reader;
    ReadOptions ro;
    const auto* table_options =
        table_factory_.GetOptions<BlockBasedTableOptions>();
    ASSERT_NE(table_options, nullptr);
    ASSERT_OK(BlockBasedTable::Open(ro, ioptions, EnvOptions(), *table_options,
                                    comparator, std::move(file), file_size,
                                    &table_reader));

    table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
  }

  std::string ToInternalKey(const std::string& key) {
    InternalKey internal_key(key, 0, ValueType::kTypeValue);
    return internal_key.Encode().ToString();
  }

  void ReadFooter(RandomAccessFileReader* file, Footer* footer) {
    uint64_t file_size = 0;
    ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size));
    IOOptions opts;
    ASSERT_OK(ReadFooterFromFile(opts, file, nullptr /* prefetch_buffer */,
                                 file_size, footer,
                                 kBlockBasedTableMagicNumber));
  }

  // NOTE: compression_type returns the compression type of the fetched block
  // contents, so if the block is fetched and uncompressed, then it's
  // kNoCompression.
  void FetchBlock(RandomAccessFileReader* file, const BlockHandle& block,
                  BlockType block_type, bool compressed, bool do_uncompress,
                  MemoryAllocator* heap_buf_allocator,
                  MemoryAllocator* compressed_buf_allocator,
                  BlockContents* contents, MemcpyStats* stats,
                  CompressionType* compresstion_type) {
    ImmutableCFOptions ioptions(options_);
    ReadOptions roptions;
    PersistentCacheOptions persistent_cache_options;
    Footer footer;
    ReadFooter(file, &footer);
    std::unique_ptr<BlockFetcher> fetcher(new BlockFetcher(
        file, nullptr /* prefetch_buffer */, footer, roptions, block, contents,
        ioptions, do_uncompress, compressed, block_type,
        UncompressionDict::GetEmptyDict(), persistent_cache_options,
        heap_buf_allocator, compressed_buf_allocator));

    ASSERT_OK(fetcher->ReadBlockContents());

    stats->num_stack_buf_memcpy = fetcher->TEST_GetNumStackBufMemcpy();
    stats->num_heap_buf_memcpy = fetcher->TEST_GetNumHeapBufMemcpy();
    stats->num_compressed_buf_memcpy =
        fetcher->TEST_GetNumCompressedBufMemcpy();

    *compresstion_type = fetcher->get_compression_type();
  }

  // NOTE: expected_compression_type is the expected compression
  // type of the fetched block content, if the block is uncompressed,
  // then the expected compression type is kNoCompression.
  void FetchFirstDataBlock(const std::string& table_name, bool compressed,
                           bool do_uncompress,
                           CompressionType expected_compression_type,
                           MemoryAllocator* heap_buf_allocator,
                           MemoryAllocator* compressed_buf_allocator,
                           BlockContents* block, std::string* result,
                           MemcpyStats* memcpy_stats) {
    ImmutableCFOptions ioptions(options_);
    InternalKeyComparator comparator(options_.comparator);
    FileOptions foptions(options_);

    // Get block handle for the first data block.
    std::unique_ptr<BlockBasedTable> table;
    NewTableReader(ioptions, foptions, comparator, table_name, &table);

    std::unique_ptr<BlockBasedTable::IndexReader> index_reader;
    ReadOptions ro;
    ASSERT_OK(BinarySearchIndexReader::Create(
        table.get(), ro, nullptr /* prefetch_buffer */, false /* use_cache */,
        false /* prefetch */, false /* pin */, nullptr /* lookup_context */,
        &index_reader));

    std::unique_ptr<InternalIteratorBase<IndexValue>> iter(
        index_reader->NewIterator(
            ReadOptions(), false /* disable_prefix_seek */, nullptr /* iter */,
            nullptr /* get_context */, nullptr /* lookup_context */));
    ASSERT_OK(iter->status());
    iter->SeekToFirst();
    BlockHandle first_block_handle = iter->value().handle;

    // Fetch first data block.
    std::unique_ptr<RandomAccessFileReader> file;
    NewFileReader(table_name, foptions, &file);
    CompressionType compression_type;
    FetchBlock(file.get(), first_block_handle, BlockType::kData, compressed,
               do_uncompress, heap_buf_allocator, compressed_buf_allocator,
               block, memcpy_stats, &compression_type);
    ASSERT_EQ(compression_type, expected_compression_type);
    result->assign(block->data.ToString());
  }
};

// Skip the following tests in lite mode since direct I/O is unsupported.
#ifndef ROCKSDB_LITE

// Fetch index block under both direct IO and non-direct IO.
// Expects:
// the index block contents are the same for both read modes.
TEST_F(BlockFetcherTest, FetchIndexBlock) {
  for (CompressionType compression : GetSupportedCompressions()) {
    std::string table_name =
        "FetchIndexBlock" + CompressionTypeToString(compression);
    CreateTable(table_name, compression);

    CountedMemoryAllocator allocator;
    MemcpyStats memcpy_stats;
    BlockContents indexes[NumModes];
    std::string index_datas[NumModes];
    for (int i = 0; i < NumModes; ++i) {
      SetMode(static_cast<Mode>(i));
      FetchIndexBlock(table_name, &allocator, &allocator, &memcpy_stats,
                      &indexes[i], &index_datas[i]);
    }
    for (int i = 0; i < NumModes - 1; ++i) {
      AssertSameBlock(index_datas[i], index_datas[i + 1]);
    }
  }
}

// Data blocks are not compressed,
// fetch data block under direct IO, mmap IO,and non-direct IO.
// Expects:
// 1. in non-direct IO mode, allocate a heap buffer and memcpy the block
//    into the buffer;
// 2. in direct IO mode, allocate a heap buffer and memcpy from the
//    direct IO buffer to the heap buffer.
TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) {
  TestStats expected_non_mmap_stats = {
      {
          0 /* num_stack_buf_memcpy */,
          1 /* num_heap_buf_memcpy */,
          0 /* num_compressed_buf_memcpy */,
      },
      {
          1 /* num_heap_buf_allocations */,
          0 /* num_compressed_buf_allocations */,
      }};
  TestStats expected_mmap_stats = {{
                                       0 /* num_stack_buf_memcpy */,
                                       0 /* num_heap_buf_memcpy */,
                                       0 /* num_compressed_buf_memcpy */,
                                   },
                                   {
                                       0 /* num_heap_buf_allocations */,
                                       0 /* num_compressed_buf_allocations */,
                                   }};
  std::array<TestStats, NumModes> expected_stats_by_mode{{
      expected_non_mmap_stats /* kBufferedRead */,
      expected_mmap_stats /* kBufferedMmap */,
      expected_non_mmap_stats /* kDirectRead */,
  }};
  TestFetchDataBlock("FetchUncompressedDataBlock", false, false,
                     expected_stats_by_mode);
}

// Data blocks are compressed,
// fetch data block under both direct IO and non-direct IO,
// but do not uncompress.
// Expects:
// 1. in non-direct IO mode, allocate a compressed buffer and memcpy the block
//    into the buffer;
// 2. in direct IO mode, allocate a compressed buffer and memcpy from the
//    direct IO buffer to the compressed buffer.
TEST_F(BlockFetcherTest, FetchCompressedDataBlock) {
  TestStats expected_non_mmap_stats = {
      {
          0 /* num_stack_buf_memcpy */,
          0 /* num_heap_buf_memcpy */,
          1 /* num_compressed_buf_memcpy */,
      },
      {
          0 /* num_heap_buf_allocations */,
          1 /* num_compressed_buf_allocations */,
      }};
  TestStats expected_mmap_stats = {{
                                       0 /* num_stack_buf_memcpy */,
                                       0 /* num_heap_buf_memcpy */,
                                       0 /* num_compressed_buf_memcpy */,
                                   },
                                   {
                                       0 /* num_heap_buf_allocations */,
                                       0 /* num_compressed_buf_allocations */,
                                   }};
  std::array<TestStats, NumModes> expected_stats_by_mode{{
      expected_non_mmap_stats /* kBufferedRead */,
      expected_mmap_stats /* kBufferedMmap */,
      expected_non_mmap_stats /* kDirectRead */,
  }};
  TestFetchDataBlock("FetchCompressedDataBlock", true, false,
                     expected_stats_by_mode);
}

// Data blocks are compressed,
// fetch and uncompress data block under both direct IO and non-direct IO.
// Expects:
// 1. in non-direct IO mode, since the block is small, so it's first memcpyed
//    to the stack buffer, then a heap buffer is allocated and the block is
//    uncompressed into the heap.
// 2. in direct IO mode mode, allocate a heap buffer, then directly uncompress
//    and memcpy from the direct IO buffer to the heap buffer.
TEST_F(BlockFetcherTest, FetchAndUncompressCompressedDataBlock) {
  TestStats expected_buffered_read_stats = {
      {
          1 /* num_stack_buf_memcpy */,
          1 /* num_heap_buf_memcpy */,
          0 /* num_compressed_buf_memcpy */,
      },
      {
          1 /* num_heap_buf_allocations */,
          0 /* num_compressed_buf_allocations */,
      }};
  TestStats expected_mmap_stats = {{
                                       0 /* num_stack_buf_memcpy */,
                                       1 /* num_heap_buf_memcpy */,
                                       0 /* num_compressed_buf_memcpy */,
                                   },
                                   {
                                       1 /* num_heap_buf_allocations */,
                                       0 /* num_compressed_buf_allocations */,
                                   }};
  TestStats expected_direct_read_stats = {
      {
          0 /* num_stack_buf_memcpy */,
          1 /* num_heap_buf_memcpy */,
          0 /* num_compressed_buf_memcpy */,
      },
      {
          1 /* num_heap_buf_allocations */,
          0 /* num_compressed_buf_allocations */,
      }};
  std::array<TestStats, NumModes> expected_stats_by_mode{{
      expected_buffered_read_stats,
      expected_mmap_stats,
      expected_direct_read_stats,
  }};
  TestFetchDataBlock("FetchAndUncompressCompressedDataBlock", true, true,
                     expected_stats_by_mode);
}

#endif  // ROCKSDB_LITE

}  // namespace
}  // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}
back to top