Revision b55b2f45d04d95010cd1a40f2701990abe43c3de authored by Peter Dillinger on 05 September 2019, 21:57:39 UTC, committed by Facebook Github Bot on 05 September 2019, 21:59:25 UTC
Summary: Since DynamicBloom is now only used in-memory, we're free to change it without schema compatibility issues. The new implementation is drawn from (with manifest permission) https://github.com/pdillinger/wormhashing/blob/303542a767437f56d8b66cea6ebecaac0e6a61e9/bloom_simulation_tests/foo.cc#L613 This has several speed advantages over the prior implementation: * Uses fastrange instead of % * Minimum logic to determine first (and all) probed memory addresses * (Major) Two probes per 64-bit memory fetch/write. * Very fast and effective (murmur-like) hash expansion/re-mixing. (At least on recent CPUs, integer multiplication is very cheap.) While a Bloom filter with 512-bit cache locality has about a 1.15x FP rate penalty (e.g. 0.84% to 0.97%), further restricting to two probes per 64 bits incurs an additional 1.12x FP rate penalty (e.g. 0.97% to 1.09%). Nevertheless, the unit tests show no "mediocre" FP rate samples, unlike the old implementation with more erratic FP rates. Especially for the memtable, we expect speed to outweigh somewhat higher FP rates. For example, a negative table query would have to be 1000x slower than a BF query to justify doubling BF query time to shave 10% off FP rate (working assumption around 1% FP rate). While that seems likely for SSTs, my data suggests a speed factor of roughly 50x for the memtable (vs. BF; ~1.5% lower write throughput when enabling memtable Bloom filter, after this change). Thus, it's probably not worth even 5% more time in the Bloom filter to shave off 1/10th of the Bloom FP rate, or 0.1% in absolute terms, and it's probably at least 20% slower to recoup that much FP rate from this new implementation. Because of this, we do not see a need for a 'locality' option that affects the MemTable Bloom filter and have decoupled the MemTable Bloom filter from Options::bloom_locality. Note that just 3% more memory to the Bloom filter (10.3 bits per key vs. just 10) is able to make up for the ~12% FP rate drop in the new implementation: [] # Nearly "ideal" FP-wise but reasonably fast cache-local implementation [~/wormhashing/bloom_simulation_tests] ./foo_gcc_IMPL_CACHE_WORM64_FROM32_any.out 10000000 6 10 $RANDOM 100000000 ./foo_gcc_IMPL_CACHE_WORM64_FROM32_any.out time: 3.29372 sampled_fp_rate: 0.00985956 ... [] # Close match to this new implementation [~/wormhashing/bloom_simulation_tests] ./foo_gcc_IMPL_CACHE_MUL64_BLOCK_FROM32_any.out 10000000 6 10.3 $RANDOM 100000000 ./foo_gcc_IMPL_CACHE_MUL64_BLOCK_FROM32_any.out time: 2.10072 sampled_fp_rate: 0.00985655 ... [] # Old locality=1 implementation [~/wormhashing/bloom_simulation_tests] ./foo_gcc_IMPL_CACHE_ROCKSDB_DYNAMIC_any.out 10000000 6 10 $RANDOM 100000000 ./foo_gcc_IMPL_CACHE_ROCKSDB_DYNAMIC_any.out time: 3.95472 sampled_fp_rate: 0.00988943 ... Also note the dramatic speed improvement vs. alternatives. -- Performance unit test: DynamicBloomTest.concurrent_with_perf is updated to report more precise timing data. (Measure running time of each thread, not just longest running thread, etc.) Results averaged over various sizes enabled with --enable_perf and 20 runs each; old dynamic bloom refers to locality=1, the faster of the old: old dynamic bloom, avg add latency = 65.6468 new dynamic bloom, avg add latency = 44.3809 old dynamic bloom, avg query latency = 50.6485 new dynamic bloom, avg query latency = 43.2186 old avg parallel add latency = 41.678 new avg parallel add latency = 24.5238 old avg parallel hit latency = 14.6322 new avg parallel hit latency = 12.3939 old avg parallel miss latency = 16.7289 new avg parallel miss latency = 12.2134 Tested on a dedicated 64-bit production machine at Facebook. Significant improvement all around. Despite now using std::atomic<uint64_t>, quick before-and-after test on a 32-bit machine (Intel Atom N270, released 2008) shows no regression in performance, in some cases modest improvement. -- Performance integration test (synthetic): with DEBUG_LEVEL=0, used TEST_TMPDIR=/dev/shm ./db_bench --benchmarks=fillrandom,readmissing,readrandom,stats --num=2000000 and optionally with -memtable_whole_key_filtering -memtable_bloom_size_ratio=0.01 300 runs each configuration. Write throughput change by enabling memtable bloom: Old locality=0: -3.06% Old locality=1: -2.37% New: -1.50% conclusion -> seems to substantially close the gap Readmissing throughput change by enabling memtable bloom: Old locality=0: +34.47% Old locality=1: +34.80% New: +33.25% conclusion -> maybe a small new penalty from FP rate Readrandom throughput change by enabling memtable bloom: Old locality=0: +31.54% Old locality=1: +31.13% New: +30.60% conclusion -> maybe also from FP rate (after memtable flush) -- Another conclusion we can draw from this new implementation is that the existing 32-bit hash function is not inherently crippling the Bloom filter speed or accuracy, below about 5 million keys. For speed, the implementation is essentially the same whether starting with 32-bits or 64-bits of hash; it just determines whether the first multiplication after fastrange is a pseudorandom expansion or needed re-mix. Note that this multiplication can occur while memory is fetching. For accuracy, in a standard configuration, you need about 5 million keys before you have about a 1.1x FP penalty due to using a 32-bit hash vs. 64-bit: [~/wormhashing/bloom_simulation_tests] ./foo_gcc_IMPL_CACHE_MUL64_BLOCK_FROM32_any.out $((5 * 1000 * 1000 * 10)) 6 10 $RANDOM 100000000 ./foo_gcc_IMPL_CACHE_MUL64_BLOCK_FROM32_any.out time: 2.52069 sampled_fp_rate: 0.0118267 ... [~/wormhashing/bloom_simulation_tests] ./foo_gcc_IMPL_CACHE_MUL64_BLOCK_any.out $((5 * 1000 * 1000 * 10)) 6 10 $RANDOM 100000000 ./foo_gcc_IMPL_CACHE_MUL64_BLOCK_any.out time: 2.43871 sampled_fp_rate: 0.0109059 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5762 Differential Revision: D17214194 Pulled By: pdillinger fbshipit-source-id: ad9da031772e985fd6b62a0e1db8e81892520595
1 parent 19e8c9b
block_fetcher.cc
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_fetcher.h"
#include <cinttypes>
#include <string>
#include "logging/logging.h"
#include "memory/memory_allocator.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/env.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/xxhash.h"
namespace rocksdb {
inline void BlockFetcher::CheckBlockChecksum() {
// Check the crc of the type and the block contents
if (read_options_.verify_checksums) {
const char* data = slice_.data(); // Pointer to where Read put the data
PERF_TIMER_GUARD(block_checksum_time);
uint32_t value = DecodeFixed32(data + block_size_ + 1);
uint32_t actual = 0;
switch (footer_.checksum()) {
case kNoChecksum:
break;
case kCRC32c:
value = crc32c::Unmask(value);
actual = crc32c::Value(data, block_size_ + 1);
break;
case kxxHash:
actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
break;
case kxxHash64:
actual = static_cast<uint32_t>(
XXH64(data, static_cast<int>(block_size_) + 1, 0) &
uint64_t{0xffffffff});
break;
default:
status_ = Status::Corruption(
"unknown checksum type " + ToString(footer_.checksum()) + " in " +
file_->file_name() + " offset " + ToString(handle_.offset()) +
" size " + ToString(block_size_));
}
if (status_.ok() && actual != value) {
status_ = Status::Corruption(
"block checksum mismatch: expected " + ToString(actual) + ", got " +
ToString(value) + " in " + file_->file_name() + " offset " +
ToString(handle_.offset()) + " size " + ToString(block_size_));
}
}
}
inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
Status status = PersistentCacheHelper::LookupUncompressedPage(
cache_options_, handle_, contents_);
if (status.ok()) {
// uncompressed page is found for the block handle
return true;
} else {
// uncompressed page is not found
if (ioptions_.info_log && !status.IsNotFound()) {
assert(!status.ok());
ROCKS_LOG_INFO(ioptions_.info_log,
"Error reading from persistent cache. %s",
status.ToString().c_str());
}
}
}
return false;
}
inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (prefetch_buffer_ != nullptr &&
prefetch_buffer_->TryReadFromCache(
handle_.offset(),
static_cast<size_t>(handle_.size()) + kBlockTrailerSize, &slice_,
for_compaction_)) {
block_size_ = static_cast<size_t>(handle_.size());
CheckBlockChecksum();
if (!status_.ok()) {
return true;
}
got_from_prefetch_buffer_ = true;
used_buf_ = const_cast<char*>(slice_.data());
}
return got_from_prefetch_buffer_;
}
inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
if (status_.ok()) {
heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
} else if (!status_.IsNotFound() && ioptions_.info_log) {
assert(!status_.ok());
ROCKS_LOG_INFO(ioptions_.info_log,
"Error reading from persistent cache. %s",
status_.ToString().c_str());
}
}
return false;
}
inline void BlockFetcher::PrepareBufferForBlockFromFile() {
// cache miss read from device
if (do_uncompress_ &&
block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
// If we've got a small enough hunk of data, read it in to the
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else if (maybe_compressed_ && !do_uncompress_) {
compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
memory_allocator_compressed_);
used_buf_ = compressed_buf_.get();
} else {
heap_buf_ =
AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
used_buf_ = heap_buf_.get();
}
}
inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && read_options_.fill_cache &&
cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// insert to raw cache
PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_,
block_size_ + kBlockTrailerSize);
}
}
inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
// insert to uncompressed cache
PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_,
*contents_);
}
}
inline void BlockFetcher::CopyBufferToHeap() {
assert(used_buf_ != heap_buf_.get());
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
inline void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) {
// the slice content is not the buffer provided
*contents_ = BlockContents(Slice(slice_.data(), block_size_));
} else {
// page can be either uncompressed or compressed, the buffer either stack
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
CopyBufferToHeap();
} else if (used_buf_ == compressed_buf_.get()) {
if (compression_type_ == kNoCompression &&
memory_allocator_ != memory_allocator_compressed_) {
CopyBufferToHeap();
} else {
heap_buf_ = std::move(compressed_buf_);
}
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_);
}
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif
}
Status BlockFetcher::ReadBlockContents() {
block_size_ = static_cast<size_t>(handle_.size());
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type_ = kNoCompression;
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif // NDEBUG
return Status::OK();
}
if (TryGetFromPrefetchBuffer()) {
if (!status_.ok()) {
return status_;
}
} else if (!TryGetCompressedBlockFromPersistentCache()) {
PrepareBufferForBlockFromFile();
Status s;
{
PERF_TIMER_GUARD(block_read_time);
// Actual file read
status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize,
&slice_, used_buf_, for_compaction_);
}
PERF_COUNTER_ADD(block_read_count, 1);
// TODO: introduce dedicated perf counter for range tombstones
switch (block_type_) {
case BlockType::kFilter:
PERF_COUNTER_ADD(filter_block_read_count, 1);
break;
case BlockType::kCompressionDictionary:
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(index_block_read_count, 1);
break;
// Nothing to do here as we don't have counters for the other types.
default:
break;
}
PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
if (!status_.ok()) {
return status_;
}
if (slice_.size() != block_size_ + kBlockTrailerSize) {
return Status::Corruption("truncated block read from " +
file_->file_name() + " offset " +
ToString(handle_.offset()) + ", expected " +
ToString(block_size_ + kBlockTrailerSize) +
" bytes, got " + ToString(slice_.size()));
}
CheckBlockChecksum();
if (status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
} else {
return status_;
}
}
PERF_TIMER_GUARD(block_decompress_time);
compression_type_ = get_block_compression_type(slice_.data(), block_size_);
if (do_uncompress_ && compression_type_ != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext context(compression_type_);
UncompressionInfo info(context, uncompression_dict_, compression_type_);
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_,
memory_allocator_);
compression_type_ = kNoCompression;
} else {
GetBlockContents();
}
InsertUncompressedBlockToPersistentCacheIfNeeded();
return status_;
}
} // namespace rocksdb
Computing file changes ...