https://github.com/lorenzhs/BuRR
Raw File
Tip revision: 1c62832ad7d6eab5b337f386955868c3ce9a54ea authored by Lorenz Hübschle-Schneider on 11 September 2021, 12:56:53 UTC
README: paper link, bibtex
Tip revision: 1c62832
ribbon.cpp
//  Copyright (c) Lorenz Hübschle-Schneider
//  All Rights Reserved.  This source code is licensed under the Apache 2.0
//  License (found in the LICENSE file in the root directory).

#include "ribbon.hpp"
#include "rocksdb/stop_watch.h"

#include <tlx/cmdline_parser.hpp>
#include <tlx/logger.hpp>

#include <atomic>
#include <cstdlib>
#include <numeric>
#include <thread>
#include <vector>

#define DO_EXPAND(VAL) VAL##1
#define EXPAND(VAL)    DO_EXPAND(VAL)

#if !defined(RIBBON_BITS) || (EXPAND(RIBBON_BITS) == 1)
#undef RIBBON_BITS
#define RIBBON_BITS 8
#endif

using namespace ribbon;

bool no_queries = false;

template <uint8_t depth, typename Config>
void run(size_t num_slots, double eps, size_t seed, unsigned num_threads) {
    IMPORT_RIBBON_CONFIG(Config);

    const double slots_per_item = eps + 1.0;
    const size_t num_items = num_slots / slots_per_item;
    LOG1 << "Running simple test with " << num_slots << " slots, eps=" << eps
         << " -> " << num_items << " items, seed=" << seed
         << " config: L=" << kCoeffBits << " B=" << kBucketSize
         << " r=" << kResultBits;

    rocksdb::StopWatchNano timer(true);

    auto input = std::make_unique<int[]>(num_items);
    std::iota(input.get(), input.get() + num_items, 0);
    LOG1 << "Input generation took " << timer.ElapsedNanos(true) / 1e6 << "ms";

    ribbon_filter<depth, Config> r(num_slots, slots_per_item, seed);

    LOG1 << "Allocation took " << timer.ElapsedNanos(true) / 1e6 << "ms\n";

    LOG1 << "Adding rows to filter....";
    r.AddRange(input.get(), input.get() + num_items);
    LOG1 << "Insertion took " << timer.ElapsedNanos(true) / 1e6 << "ms in total\n";

    input.reset();

    r.BackSubst();
    LOG1 << "Backsubstitution took " << timer.ElapsedNanos(true) / 1e6
         << "ms in total\n";

    const size_t bytes = r.Size();
    const double relsize = (bytes * 8 * 100.0) / (num_items * Config::kResultBits);
    LOG1 << "Ribbon size: " << bytes << " Bytes = " << (bytes * 1.0) / num_items
         << " Bytes per item = " << relsize << "%\n";

    std::atomic<bool> ok = true;
    auto pos_query = [&r, &ok, &num_items, &num_threads](unsigned id) {
        bool my_ok = true;
        size_t start = num_items / num_threads * id;
        // don't do the same queries on all threads
        for (size_t v = start; v < num_items; v++) {
            bool found = r.QueryFilter((int)v);
            assert(found);
            my_ok &= found;
        }
        for (size_t v = 0; v < start; v++) {
            bool found = r.QueryFilter((int)v);
            assert(found);
            my_ok &= found;
        }
        if (!my_ok)
            ok = false;
    };
    std::vector<std::thread> threads;
    for (unsigned i = 0; i < num_threads && !no_queries; i++) {
        threads.emplace_back(pos_query, i);
    }
    for (auto& t : threads)
        t.join();

    const auto check_nanos = timer.ElapsedNanos(true);
    LOG1 << "Parallel check with " << num_threads << " threads "
         << (ok ? "successful" : "FAILED") << " and took " << check_nanos / 1e6
         << "ms = " << check_nanos * 1.0 / num_items << "ns per key";
    // r.PrintStats();

    std::atomic<size_t> found = 0;
    auto neg_query = [&r, &found, &num_items, &num_threads](unsigned id) {
        size_t my_found = 0;
        // offset queries between threads
        size_t start = num_items + num_items / num_threads * id;
        for (size_t v = start; v < 2 * num_items; v++) {
            my_found += r.QueryFilter((int)v);
        }
        for (size_t v = num_items; v < start; v++) {
            my_found += r.QueryFilter((int)v);
        }
        found.fetch_add(my_found);
    };
    threads.clear();
    for (unsigned i = 0; i < num_threads && !no_queries; i++) {
        threads.emplace_back(neg_query, i);
    }
    for (auto& t : threads)
        t.join();

    const auto negq_nanos = timer.ElapsedNanos(true);
    const double fprate = found * 1.0 / (num_threads * num_items),
                 ratio = fprate * (1ul << Config::kResultBits);
    LOG1 << "Negative queries took " << negq_nanos / 1e6
         << "ms = " << negq_nanos * 1.0 / num_items << "ns per key, " << found
         << " FPs = " << fprate * 100 << "%, expecting "
         << 100.0 / (1ul << Config::kResultBits) << "% -> ratio = " << ratio;
    // r.PrintStats();
    auto [tl_bumped, tl_empty_slots, tl_frac_empty, tl_thresh_bytes] =
        r.GetStats();
    LOG1 << "RESULT n=" << num_items << " m=" << num_slots << " eps=" << eps
         << " d=" << (int)depth << dump_config<Config>() << " bytes=" << bytes
         << " tlempty=" << tl_empty_slots << " tlbumped=" << tl_bumped
         << " tlemptyfrac=" << tl_frac_empty
         << " tlthreshbytes=" << tl_thresh_bytes << " overhead=" << relsize - 100
         << " ok=" << ok << " tpos=" << check_nanos
         << " tpospq=" << (check_nanos * 1.0 / num_items) << " tneg=" << negq_nanos
         << " tnegpq=" << (negq_nanos * 1.0 / num_items) << " fps=" << found
         << " fpr=" << fprate << " ratio=" << ratio << " threads=" << num_threads;
}


template <ThreshMode mode, uint8_t depth, size_t L, size_t r, bool interleaved,
          bool cls, bool sparse, typename... Args>
void dispatch_shift(int shift, Args&... args) {
    switch (shift) {
        case 0:
            run<depth, RConfig<L, r, mode, sparse, interleaved, cls, 0>>(args...);
            break;
        case -1:
            run<depth, RConfig<L, r, mode, sparse, interleaved, cls, -1>>(args...);
            break;
        case 1:
            run<depth, RConfig<L, r, mode, sparse, interleaved, cls, 1>>(args...);
            break;
        default: LOG1 << "Unsupported bucket size shift: " << shift;
    }
}


template <ThreshMode mode, uint8_t depth, size_t L, size_t r, bool interleaved,
          bool cls, typename... Args>
void dispatch_sparse(bool sparse, Args&... args) {
    if (sparse) {
        if constexpr (interleaved) {
            LOG1 << "Sparse coefficients + interleaved sol doesn't make sense";
        } else {
            dispatch_shift<mode, depth, L, r, interleaved, cls, true>(args...);
        }
    } else {
        dispatch_shift<mode, depth, L, r, interleaved, cls, false>(args...);
    }
}

template <ThreshMode mode, uint8_t depth, size_t L, size_t r, typename... Args>
void dispatch_storage(bool cls, bool interleaved, Args&... args) {
    assert(!cls || !interleaved);
    if (cls) {
        // dispatch_sparse<mode, depth, L, r, false, true>(args...);
        LOG1 << "Cache-Line Storage is currently disabled";
    } else if (interleaved) {
        dispatch_sparse<mode, depth, L, r, true, false>(args...);
    } else {
        dispatch_sparse<mode, depth, L, r, false, false>(args...);
    }
}

template <ThreshMode mode, uint8_t depth, typename... Args>
void dispatch_width(size_t band_width, Args&... args) {
    static constexpr size_t r = RIBBON_BITS;
    switch (band_width) {
        // case 16: dispatch_storage<mode, depth, 16, r>(args...); break;
        case 32: dispatch_storage<mode, depth, 32, r>(args...); break;
        case 64: dispatch_storage<mode, depth, 64, r>(args...); break;
        case 128: dispatch_storage<mode, depth, 128, r>(args...); break;
        default: LOG1 << "Unsupported band width: " << band_width;
    }
}

template <ThreshMode mode, typename... Args>
void dispatch_depth(unsigned depth, Args&... args) {
    switch (depth) {
        case 0: dispatch_width<mode, 0>(args...); break;
        case 1: dispatch_width<mode, 1>(args...); break;
        case 2: dispatch_width<mode, 2>(args...); break;
        case 3: dispatch_width<mode, 3>(args...); break;
        case 4: dispatch_width<mode, 4>(args...); break;
        default: LOG1 << "Unsupported recursion depth: " << depth;
    }
}

template <typename... Args>
void dispatch(ThreshMode mode, Args&... args) {
    switch (mode) {
        case ThreshMode::onebit:
            dispatch_depth<ThreshMode::onebit>(args...);
            break;
        case ThreshMode::twobit:
            dispatch_depth<ThreshMode::twobit>(args...);
            break;
        case ThreshMode::normal:
            dispatch_depth<ThreshMode::normal>(args...);
            break;
        default:
            LOG1 << "Unsupported threshold compression mode: " << (int)mode;
    }
}

int main(int argc, char** argv) {
    tlx::CmdlineParser cmd;
    size_t seed = 42, num_slots = 1024 * 1024;
    unsigned ribbon_width = 32, depth = 3;
    double eps = -1;
    unsigned num_threads = std::thread::hardware_concurrency();
    bool onebit = false, twobit = false, sparsecoeffs = false, cls = false,
         interleaved = false;
    int shift = 0;
    cmd.add_size_t('s', "seed", seed, "random seed");
    cmd.add_size_t('m', "slots", num_slots, "number of slots in the filter");
    cmd.add_unsigned('L', "ribbon_width", ribbon_width, "ribbon width (16/32/64)");
    cmd.add_unsigned('d', "depth", depth, "number of recursive filters");
    cmd.add_double('e', "epsilon", eps, "epsilon, #items = filtersize/(1+epsilon)");
    cmd.add_unsigned('t', "threads", num_threads, "number of query threads");
    cmd.add_int('b', "bsshift", shift,
                "whether to shift bucket size one way or another");
    cmd.add_bool('1', "onebit", onebit,
                 "use one-plus-a-little-bit threshold compression");
    cmd.add_bool('2', "twobit", twobit, "use two-bit threshold compression");
    cmd.add_bool('S', "sparsecoeffs", sparsecoeffs,
                 "use sparse coefficient vectors");
    cmd.add_bool('C', "cls", cls, "use cache-line solution storage");
    cmd.add_bool('I', "interleaved", interleaved,
                 "use interleaved solution storage");
    cmd.add_bool('Q', "noqueries", no_queries,
                 "don't run any queries (for scripting)");

    if (!cmd.process(argc, argv) || (onebit && twobit)) {
        cmd.print_usage();
        return 1;
    }
    if (eps == -1) {
        if (onebit) {
            size_t bucket_size = 1ul << tlx::integer_log2_floor(
                                     ribbon_width * ribbon_width /
                                     (4 * tlx::integer_log2_ceil(ribbon_width)));
            if (shift < 0)
                bucket_size >>= -shift;
            else
                bucket_size <<= shift;
            eps = -0.666 * ribbon_width / (4 * bucket_size + ribbon_width);
        } else {
            // for small ribbon widths, don't go too far from 0
            const double fct = ribbon_width <= 32 ? 3.0 : 4.0;
            eps = -fct / ribbon_width;
        }
    }
    if (seed == 0)
        seed = std::random_device{}();
    cmd.print_result();

    ThreshMode mode = onebit ? ThreshMode::onebit
                             : (twobit ? ThreshMode::twobit : ThreshMode::normal);

    dispatch(mode, depth, ribbon_width, cls, interleaved, sparsecoeffs, shift,
             num_slots, eps, seed, num_threads);
}
back to top