https://github.com/lorenzhs/BuRR
Tip revision: 1c62832ad7d6eab5b337f386955868c3ce9a54ea authored by Lorenz Hübschle-Schneider on 11 September 2021, 12:56:53 UTC
README: paper link, bibtex
README: paper link, bibtex
Tip revision: 1c62832
parbench.cpp
// Copyright (c) Lorenz Hübschle-Schneider
// All Rights Reserved. This source code is licensed under the Apache 2.0
// License (found in the LICENSE file in the root directory).
// during construction, hash only once. This reduces memory traffic for large
// objects (though not really relevant here) and reduces random accesses (more
// relevant), even though they are prefetchable (and prefetched)
#define RIBBON_PASS_HASH
#include "pcg-cpp/include/pcg_random.hpp"
#include "ribbon.hpp"
#include "rocksdb/stop_watch.h"
#include <tlx/cmdline_parser.hpp>
#include <tlx/logger.hpp>
#include <tlx/math/aggregate.hpp>
#include <tlx/thread_pool.hpp>
#include <algorithm>
#include <atomic>
#include <cstdlib>
#include <iomanip>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <vector>
#define DO_EXPAND(VAL) VAL##1
#define EXPAND(VAL) DO_EXPAND(VAL)
#if !defined(RIBBON_BITS) || (EXPAND(RIBBON_BITS) == 1)
#undef RIBBON_BITS
#define RIBBON_BITS 8
#endif
using namespace ribbon;
template <uint8_t depth, typename Config>
void run(unsigned num_filters, size_t num_slots, size_t num_queries, double eps,
size_t seed, unsigned num_threads) {
IMPORT_RIBBON_CONFIG(Config);
constexpr bool debug = false;
const double slots_per_item = eps + 1.0;
const size_t num_items = num_slots / slots_per_item;
LOG1 << "Running parallel test with " << num_slots << " slots, eps=" << eps
<< " -> " << num_items << " items, seed=" << seed
<< " config: L=" << kCoeffBits << " B=" << kBucketSize
<< " r=" << kResultBits << " FCA1? " << kFirstCoeffAlwaysOne << " SC? "
<< kSparseCoeffs << " mode " << (int)kThreshMode << " interleaved? "
<< kUseInterleavedSol << " CLS? " << kUseCacheLineStorage;
rocksdb::StopWatchNano timer(true);
using ribbon_t = ribbon_filter<depth, Config>;
std::vector<ribbon_t> ribbons(num_filters);
// Generate seeds
std::vector<size_t> seeds(num_filters + 2 * num_threads);
{
std::seed_seq seq({seed >> 32, seed});
std::vector<uint32_t> seeds32(2 * num_filters + 4 * num_threads);
seq.generate(seeds32.begin(), seeds32.end());
for (size_t i = 0; i < seeds.size(); i++) {
seeds[i] = (static_cast<uint64_t>(seeds32[2 * i]) << 32) +
static_cast<uint64_t>(seeds32[2 * i + 1]);
}
}
// all threads use the same input keys to make queries easier
auto input = std::make_unique<Key[]>(num_items);
std::iota(input.get(), input.get() + num_items, 0);
std::mutex stats_lock;
tlx::Aggregate<double> agg_init, agg_add, agg_backsub, agg_total, agg_size,
agg_tl_bumped, agg_tl_empty, agg_tl_frac_empty, agg_tl_thresh_bytes;
auto construct = [&](unsigned id) {
[[maybe_unused]] rocksdb::StopWatchNano timer(true), total_timer(true);
ribbons[id].Init(num_slots, slots_per_item, seeds[id], 0);
auto t_init = timer.ElapsedNanos(true);
LOG << "Ribbon " << id << " allocation took " << t_init / 1e6 << "ms";
ribbons[id].AddRange(input.get(), input.get() + num_items);
auto t_add = timer.ElapsedNanos(true);
LOG << "Ribbon " << id << " insertion took " << t_add / 1e6
<< "ms in total";
ribbons[id].BackSubst();
auto t_backsub = timer.ElapsedNanos(true);
LOG << "Ribbon " << id << " backsubstitution took " << t_backsub / 1e6
<< "ms in total";
auto t_total = total_timer.ElapsedNanos();
auto [tl_bumped, tl_empty_slots, tl_frac_empty, tl_thresh_bytes] =
ribbons[id].GetStats();
const size_t bytes = ribbons[id].Size();
const double relsize =
(bytes * 8 * 100.0) / (num_items * Config::kResultBits);
// statistics data structures are protected by a mutex
stats_lock.lock();
agg_size.add(bytes);
agg_tl_bumped.add(tl_bumped);
agg_tl_empty.add(tl_empty_slots);
agg_tl_frac_empty.add(tl_frac_empty);
agg_tl_thresh_bytes.add(tl_thresh_bytes);
agg_init.add(t_init / 1e6);
agg_add.add(t_add / 1e6);
agg_backsub.add(t_backsub / 1e6);
agg_total.add(t_total / 1e6);
stats_lock.unlock();
LOG1 << "Ribbon " << std::setw(std::ceil(std::log10(num_filters))) << id
<< " total construction time: " << std::fixed
<< std::setprecision(2) << t_total / 1e6
<< "ms = " << std::setprecision(2) << t_total * 1.0 / num_items
<< "ns per item, size: " << bytes << " Bytes = " << relsize << "%";
LOG1 << "RESULT type=cons id=" << id << " n=" << num_items
<< " m=" << num_slots << " eps=" << eps << " d=" << (int)depth
<< dump_config<Config>() << " tcons=" << t_total << " tinit=" << t_init
<< " tadd=" << t_add << " tbacksub=" << t_backsub
<< " bytes=" << bytes << " tlempty=" << tl_empty_slots
<< " tlbumped=" << tl_bumped << " tlemptyfrac=" << tl_frac_empty
<< " tlthreshbytes=" << tl_thresh_bytes
<< " overhead=" << relsize - 100 << " threads=" << num_threads;
};
tlx::ThreadPool pool(num_threads);
for (unsigned i = 0; i < num_filters; i++) {
pool.enqueue([&, i]() { construct(i); });
}
pool.loop_until_empty();
auto cons_nanos = timer.ElapsedNanos();
const size_t bytes_per_filter = agg_size.avg();
LOG1 << "Parallel construction of " << num_filters << " filters with "
<< num_threads << " threads took " << cons_nanos / 1e6
<< "ms = " << cons_nanos * 1.0 / (num_items * num_filters)
<< "ns per item; total size " << static_cast<size_t>(agg_size.sum())
<< "B = " << std::setprecision(3) << (agg_size.sum() / 1e9) << "GB or "
<< bytes_per_filter << "B = " << std::setprecision(4)
<< (bytes_per_filter / 1e6) << "MB per filter = "
<< (bytes_per_filter * 8 * 100.0) / (num_items * Config::kResultBits)
<< "%";
/**************************************************************************/
std::vector<uint64_t> t_posquery(num_threads), t_posgen(num_threads),
t_negquery(num_threads), t_neggen(num_threads), num_fps(num_threads);
std::vector<double> fprates(num_threads);
std::vector<char> ok(num_threads);
std::atomic<uint64_t> check_nanos = 0;
auto query = [&](unsigned id, bool positive, unsigned offset = 0) {
constexpr size_t block_size = (size_t{1} << 20);
std::vector<std::pair<Key, unsigned>> queries(block_size);
pcg64 rng(seeds[id + num_threads + offset]);
size_t count = 0, my_found = 0;
rocksdb::StopWatchNano timer;
uint64_t t_gen = 0, t_query = 0;
while (count < num_queries) {
// first, generate a block of queries
timer.Start();
size_t block_items = std::min(block_size, num_queries - count);
for (size_t item = 0; item < block_items; item++, count++) {
uint64_t rand = rng();
Key key = rocksdb::FastRangeGeneric(rand, num_items);
if (!positive)
key += num_items;
unsigned filter = rocksdb::FastRangeGeneric(
rand ^ 0x876f170be4f1fcb9UL, num_filters);
queries[item] = std::make_pair(key, filter);
}
queries.resize(block_items);
t_gen += timer.ElapsedNanos(true);
sLOG << "Generated" << block_items << "items";
for (const auto [key, filter] : queries) {
bool found = ribbons[filter].QueryFilter(key);
assert(!positive || found);
my_found += found;
}
t_query += timer.ElapsedNanos();
sLOG << "Queried" << queries.size() << "items";
}
check_nanos.fetch_add(t_query);
std::stringstream out;
out << "Thread " << std::setw(std::ceil(std::log10(num_threads))) << id
<< " spent " << std::fixed << std::setprecision(2) << t_gen / 1e6
<< "ms generating " << num_queries << " queries, " << t_query / 1e6
<< "ms on queries = " << t_query * 1.0 / num_queries
<< "ns per query. ";
if (positive) {
ok[id] = (my_found == num_queries);
t_posgen[id] = t_gen;
t_posquery[id] = t_query;
out << "Positive check "
<< (my_found == num_queries ? "OK" : "FAILED") << " found "
<< my_found << " of " << num_queries;
} else {
double fprate = my_found * 1.0 / num_queries;
t_neggen[id] = t_gen;
t_negquery[id] = t_query;
num_fps[id] = my_found;
fprates[id] = fprate;
out << "Negative queries with " << my_found
<< " FPs = " << std::setprecision(5) << fprate * 100 << "%, i.e. "
<< fprate * (1ul << Config::kResultBits) << "x expected";
}
LOG1 << out.str();
};
std::vector<std::thread> threads;
timer.Start();
for (unsigned i = 0; i < num_threads; i++) {
threads.emplace_back(query, i, true);
}
for (auto& t : threads)
t.join();
const uint64_t t_pos_aggregate = check_nanos,
t_pos_total = timer.ElapsedNanos();
const bool all_ok =
std::all_of(ok.begin(), ok.end(), [](const char o) { return o == 1; });
LOG1 << "Parallel check with " << num_threads << " threads "
<< (all_ok ? "successful" : "FAILED") << " and took "
<< t_pos_total / 1e6 << "ms total; queries took "
<< t_pos_aggregate * 1.0 / (num_queries * num_threads) << "ns per key";
/**************************************************************************/
threads.clear();
check_nanos = 0;
timer.Start();
for (unsigned i = 0; i < num_threads; i++) {
threads.emplace_back(query, i, false, /* seed offset */ num_threads);
}
for (auto& t : threads)
t.join();
const uint64_t t_neg_aggregate = check_nanos,
t_neg_total = timer.ElapsedNanos();
for (unsigned i = 0; i < num_threads; i++) {
LOG1 << "RESULT type=query id=" << i << " n=" << num_items
<< " m=" << num_slots << " eps=" << eps << " d=" << (int)depth
<< dump_config<Config>() << " ok=" << (int)ok[i]
<< " tpos=" << t_posquery[i]
<< " tpospq=" << (t_posquery[i] * 1.0 / num_items)
<< " tneg=" << t_negquery[i]
<< " tneqpq=" << (t_negquery[i] * 1.0 / num_items)
<< " fps=" << num_fps[i] << " fpr=" << fprates[i]
<< " ratio=" << fprates[i] * (1ul << Config::kResultBits)
<< " threads=" << num_threads;
}
const uint64_t found = std::accumulate(num_fps.begin(), num_fps.end(), 0);
const double fprate = found * 1.0 / (num_threads * num_queries),
ratio = fprate * (1ul << Config::kResultBits);
const double relsize =
(agg_size.avg() * 8 * 100.0) / (num_items * Config::kResultBits);
LOG1 << "Negative check took " << t_neg_total / 1e6 << "ms total; queries took "
<< t_neg_aggregate * 1.0 / (num_queries * num_threads) << "ns per key, "
<< found << " FPs = " << fprate * 100 << "%, expecting "
<< 100.0 / (1ul << Config::kResultBits) << "% -> ratio = " << ratio;
// Write aggregate results
LOG1 << "RESULT type=agg n=" << num_items << " m=" << num_slots
<< " eps=" << eps << " d=" << (int)depth << dump_config<Config>()
<< " tcons=" << cons_nanos / 1e6 << " tconsavg=" << agg_total.avg()
<< " tconsdev=" << agg_total.stdev() << " tinitavg=" << agg_init.avg()
<< " tinitdev=" << agg_init.stdev() << " taddavg=" << agg_add.avg()
<< " tadddev=" << agg_add.stdev() << " tbacksubavg=" << agg_backsub.avg()
<< " tbacksubdev=" << agg_backsub.stdev()
<< " totalbytes=" << static_cast<size_t>(agg_size.sum())
<< " bytesavg=" << agg_size.avg() << " bytesdev=" << agg_size.stdev()
<< " tlemptyavg=" << agg_tl_empty.avg()
<< " tlemptydev=" << agg_tl_empty.stdev()
<< " tlbumpedavg=" << agg_tl_bumped.avg()
<< " tlbumpeddev=" << agg_tl_bumped.stdev()
<< " tlemptyfracavg=" << agg_tl_frac_empty.avg()
<< " tlepmtyfracdev=" << agg_tl_frac_empty.stdev()
<< " tlthreshbytesavg=" << agg_tl_thresh_bytes.avg()
<< " tlthreshbytesdev=" << agg_tl_thresh_bytes.stdev()
<< " overhead=" << relsize - 100 << " ok=" << all_ok
<< " tposagg=" << t_pos_aggregate << " tpostot=" << t_pos_total
<< " tpospq=" << t_pos_aggregate * 1.0 / (num_threads * num_queries)
<< " tnegagg=" << t_neg_aggregate << " tnegtot=" << t_neg_total
<< " tneqpq=" << t_neg_aggregate * 1.0 / (num_threads * num_queries)
<< " fps=" << found << " fpr=" << fprate << " ratio=" << ratio
<< " threads=" << num_threads;
}
// shut up and use 64-bit keys
template <size_t coeff_bits, size_t result_bits, ThreshMode mode, bool sparse,
bool interleaved, bool cls, int shift = 0>
struct QRConfig
: public RConfig<coeff_bits, result_bits, mode, sparse, interleaved, cls, shift, int64_t> {
static constexpr bool log = false; // quiet
using Key = int64_t;
// using Index = uint64_t;
};
template <ThreshMode mode, uint8_t depth, size_t L, size_t r, bool interleaved,
bool cls, typename... Args>
void dispatch_sparse(bool sparse, Args&... args) {
if (sparse) {
if constexpr (interleaved) {
LOG1 << "Sparse coefficients + interleaved sol doesn't make sense";
} else {
// run<depth, QRConfig<L, r, mode, true, interleaved, cls>>(args...);
}
} else {
run<depth, QRConfig<L, r, mode, false, interleaved, cls>>(args...);
}
}
template <ThreshMode mode, uint8_t depth, size_t L, size_t r, typename... Args>
void dispatch_storage(bool cls, bool interleaved, Args&... args) {
assert(!cls || !interleaved);
if (cls) {
dispatch_sparse<mode, depth, L, r, false, true>(args...);
} else if (interleaved) {
dispatch_sparse<mode, depth, L, r, true, false>(args...);
} else {
dispatch_sparse<mode, depth, L, r, false, false>(args...);
}
}
template <ThreshMode mode, uint8_t depth, typename... Args>
void dispatch_width(size_t band_width, Args&... args) {
static constexpr size_t r = RIBBON_BITS;
switch (band_width) {
case 16: dispatch_storage<mode, depth, 16, r>(args...); break;
case 32: dispatch_storage<mode, depth, 32, r>(args...); break;
case 64: dispatch_storage<mode, depth, 64, r>(args...); break;
// case 128: dispatch_storage<mode, depth, 128, r>(args...); break;
default: LOG1 << "Unsupported band width: " << band_width;
}
}
template <ThreshMode mode, typename... Args>
void dispatch_depth(unsigned depth, Args&... args) {
switch (depth) {
case 0: dispatch_width<mode, 0>(args...); break;
case 1: dispatch_width<mode, 1>(args...); break;
case 2: dispatch_width<mode, 2>(args...); break;
case 3: dispatch_width<mode, 3>(args...); break;
// case 4: dispatch_width<mode, 4>(args...); break;
default: LOG1 << "Unsupported recursion depth: " << depth;
}
}
template <typename... Args>
void dispatch(ThreshMode mode, Args&... args) {
switch (mode) {
case ThreshMode::onebit:
dispatch_depth<ThreshMode::onebit>(args...);
break;
case ThreshMode::twobit:
dispatch_depth<ThreshMode::twobit>(args...);
break;
case ThreshMode::normal:
dispatch_depth<ThreshMode::normal>(args...);
break;
default:
LOG1 << "Unsupported threshold compression mode: " << (int)mode;
}
}
int main(int argc, char** argv) {
tlx::CmdlineParser cmd;
size_t seed = 42, num_slots = 1024 * 1024, num_queries = 0;
unsigned ribbon_width = 32, depth = 3;
double eps = -1;
unsigned num_filters = 100, num_threads = std::thread::hardware_concurrency();
bool onebit = false, twobit = false, sparsecoeffs = false, cls = false,
interleaved = false;
cmd.add_size_t('s', "seed", seed, "random seed");
cmd.add_size_t('m', "slots", num_slots, "number of slots per filter");
cmd.add_size_t('q', "queries", num_queries, "number of queries per thread");
cmd.add_unsigned('k', "filters", num_filters, "number of filters (shards)");
cmd.add_unsigned('L', "ribbon_width", ribbon_width, "ribbon width (16/32/64)");
cmd.add_unsigned('d', "depth", depth, "ribbon recursion depth");
cmd.add_double('e', "epsilon", eps, "epsilon, #items = filtersize/(1+epsilon)");
cmd.add_unsigned('t', "threads", num_threads, "number of query threads");
cmd.add_bool('1', "onebit", onebit,
"use one-plus-a-little-bit threshold compression");
cmd.add_bool('2', "twobit", twobit, "use two-bit threshold compression");
cmd.add_bool('S', "sparsecoeffs", sparsecoeffs,
"use sparse coefficient vectors");
cmd.add_bool('C', "cls", cls, "use cache-line solution storage");
cmd.add_bool('I', "interleaved", interleaved,
"use interleaved solution storage");
if (!cmd.process(argc, argv) || (onebit && twobit)) {
cmd.print_usage();
return 1;
}
if (eps == -1) {
if (onebit) {
size_t bucket_size = 1ul << tlx::integer_log2_floor(
ribbon_width * ribbon_width /
(4 * tlx::integer_log2_ceil(ribbon_width)));
eps = -0.666 * ribbon_width / (4 * bucket_size + ribbon_width);
} else {
eps = -3.0 / ribbon_width;
}
}
if (num_queries == 0)
num_queries = num_slots * num_filters;
if (seed == 0)
seed = std::random_device{}();
cmd.print_result();
ThreshMode mode = onebit ? ThreshMode::onebit
: (twobit ? ThreshMode::twobit : ThreshMode::normal);
dispatch(mode, depth, ribbon_width, cls, interleaved, sparsecoeffs,
num_filters, num_slots, num_queries, eps, seed, num_threads);
}