Revision 46152d53bf58748fc3ed0681d8970c342bcfc47a authored by Andrew Kryczka on 30 April 2018, 19:23:45 UTC, committed by Facebook Github Bot on 30 April 2018, 19:27:34 UTC
Summary:
- Original commit: a4fb1f8c049ee9d61a9da8cf23b64d2c7d36a33f
- Revert commit (we reverted as a quick fix to get crash tests passing): 6afe22db2e667799d8c903db61750d676bffe152

This PR includes the contents of the original commit plus two bug fixes, which are:

- In whitebox crash test, only set `--expected_values_path` for `db_stress` runs in the first half of the crash test's duration. In the second half, a fresh DB is created for each `db_stress` run, so we cannot maintain expected state across `db_stress` runs.
- Made `Exists()` return true for `UNKNOWN_SENTINEL` values. I previously had an assert in `Exists()` that value was not `UNKNOWN_SENTINEL`. But it is possible for post-crash-recovery expected values to be `UNKNOWN_SENTINEL` (i.e., if the crash happens in the middle of an update), in which case this assertion would be tripped. The effect of returning true in this case is there may be cases where a `SingleDelete` deletes no data. But if we had returned false, the effect would be calling `SingleDelete` on a key with multiple older versions, which is not supported.
Closes https://github.com/facebook/rocksdb/pull/3793

Differential Revision: D7811671

Pulled By: ajkr

fbshipit-source-id: 67e0295bfb1695ff9674837f2e05bb29c50efc30
1 parent 282099f
Raw File
compaction_filter_example.cc
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#include <rocksdb/compaction_filter.h>
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/options.h>

class MyMerge : public rocksdb::MergeOperator {
 public:
  virtual bool FullMergeV2(const MergeOperationInput& merge_in,
                           MergeOperationOutput* merge_out) const override {
    merge_out->new_value.clear();
    if (merge_in.existing_value != nullptr) {
      merge_out->new_value.assign(merge_in.existing_value->data(),
                                  merge_in.existing_value->size());
    }
    for (const rocksdb::Slice& m : merge_in.operand_list) {
      fprintf(stderr, "Merge(%s)\n", m.ToString().c_str());
      // the compaction filter filters out bad values
      assert(m.ToString() != "bad");
      merge_out->new_value.assign(m.data(), m.size());
    }
    return true;
  }

  const char* Name() const override { return "MyMerge"; }
};

class MyFilter : public rocksdb::CompactionFilter {
 public:
  bool Filter(int level, const rocksdb::Slice& key,
              const rocksdb::Slice& existing_value, std::string* new_value,
              bool* value_changed) const override {
    fprintf(stderr, "Filter(%s)\n", key.ToString().c_str());
    ++count_;
    assert(*value_changed == false);
    return false;
  }

  bool FilterMergeOperand(int level, const rocksdb::Slice& key,
                          const rocksdb::Slice& existing_value) const override {
    fprintf(stderr, "FilterMerge(%s)\n", key.ToString().c_str());
    ++merge_count_;
    return existing_value == "bad";
  }

  const char* Name() const override { return "MyFilter"; }

  mutable int count_ = 0;
  mutable int merge_count_ = 0;
};

int main() {
  rocksdb::DB* raw_db;
  rocksdb::Status status;

  MyFilter filter;

  int ret = system("rm -rf /tmp/rocksmergetest");
  if (ret != 0) {
    fprintf(stderr, "Error deleting /tmp/rocksmergetest, code: %d\n", ret);
    return ret;
  }
  rocksdb::Options options;
  options.create_if_missing = true;
  options.merge_operator.reset(new MyMerge);
  options.compaction_filter = &filter;
  status = rocksdb::DB::Open(options, "/tmp/rocksmergetest", &raw_db);
  assert(status.ok());
  std::unique_ptr<rocksdb::DB> db(raw_db);

  rocksdb::WriteOptions wopts;
  db->Merge(wopts, "0", "bad");  // This is filtered out
  db->Merge(wopts, "1", "data1");
  db->Merge(wopts, "1", "bad");
  db->Merge(wopts, "1", "data2");
  db->Merge(wopts, "1", "bad");
  db->Merge(wopts, "3", "data3");
  db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr);
  fprintf(stderr, "filter.count_ = %d\n", filter.count_);
  assert(filter.count_ == 0);
  fprintf(stderr, "filter.merge_count_ = %d\n", filter.merge_count_);
  assert(filter.merge_count_ == 6);
}
back to top