Revision 85dbdf2586d04d0d833592c26cbede75d8c923d3 authored by Peter Dillinger on 13 March 2020, 20:36:45 UTC, committed by Facebook GitHub Bot on 13 March 2020, 20:39:03 UTC
Summary:
After we had a lot of failures with maven.org downloads, we
wanted an alternative location for downloading binary dependencies.
Hosting them through github would have been good in terms of
organizational and network dependencies, but that approach seems to be
awkward (fake releases, so would need a 'rocksdb-deps' repo) and
strangely complicated for Facebook policy on open source repositories.

This commit moves the downloads (that are not officially hosted by
others on github) from my personal rocksdb fork to an S3 bucket owned
by the Facebook RocksDB AWS account. Facebook employees can access
this through an internal tool, and we should be able to grant permission
to outside collaborators.

Assuming this works out, I will back-port to older branches to stabilize
their CI testing as well.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6526

Test Plan: CI

Differential Revision: D20430130

Pulled By: pdillinger

fbshipit-source-id: df52394a65e0a57942db3039bdaade8a4d520cb2
1 parent 5c30e6c
Raw File
compact_files_example.cc
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// An example code demonstrating how to use CompactFiles, EventListener,
// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.

#include <mutex>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"

using namespace ROCKSDB_NAMESPACE;
std::string kDBPath = "/tmp/rocksdb_compact_files_example";
struct CompactionTask;

// This is an example interface of external-compaction algorithm.
// Compaction algorithm can be implemented outside the core-RocksDB
// code by using the pluggable compaction APIs that RocksDb provides.
class Compactor : public EventListener {
 public:
  // Picks and returns a compaction task given the specified DB
  // and column family.  It is the caller's responsibility to
  // destroy the returned CompactionTask.  Returns "nullptr"
  // if it cannot find a proper compaction task.
  virtual CompactionTask* PickCompaction(
      DB* db, const std::string& cf_name) = 0;

  // Schedule and run the specified compaction task in background.
  virtual void ScheduleCompaction(CompactionTask *task) = 0;
};

// Example structure that describes a compaction task.
struct CompactionTask {
  CompactionTask(
      DB* _db, Compactor* _compactor,
      const std::string& _column_family_name,
      const std::vector<std::string>& _input_file_names,
      const int _output_level,
      const CompactionOptions& _compact_options,
      bool _retry_on_fail)
          : db(_db),
            compactor(_compactor),
            column_family_name(_column_family_name),
            input_file_names(_input_file_names),
            output_level(_output_level),
            compact_options(_compact_options),
            retry_on_fail(_retry_on_fail) {}
  DB* db;
  Compactor* compactor;
  const std::string& column_family_name;
  std::vector<std::string> input_file_names;
  int output_level;
  CompactionOptions compact_options;
  bool retry_on_fail;
};

// A simple compaction algorithm that always compacts everything
// to the highest level whenever possible.
class FullCompactor : public Compactor {
 public:
  explicit FullCompactor(const Options options) : options_(options) {
    compact_options_.compression = options_.compression;
    compact_options_.output_file_size_limit =
        options_.target_file_size_base;
  }

  // When flush happens, it determines whether to trigger compaction. If
  // triggered_writes_stop is true, it will also set the retry flag of
  // compaction-task to true.
  void OnFlushCompleted(
      DB* db, const FlushJobInfo& info) override {
    CompactionTask* task = PickCompaction(db, info.cf_name);
    if (task != nullptr) {
      if (info.triggered_writes_stop) {
        task->retry_on_fail = true;
      }
      // Schedule compaction in a different thread.
      ScheduleCompaction(task);
    }
  }

  // Always pick a compaction which includes all files whenever possible.
  CompactionTask* PickCompaction(
      DB* db, const std::string& cf_name) override {
    ColumnFamilyMetaData cf_meta;
    db->GetColumnFamilyMetaData(&cf_meta);

    std::vector<std::string> input_file_names;
    for (auto level : cf_meta.levels) {
      for (auto file : level.files) {
        if (file.being_compacted) {
          return nullptr;
        }
        input_file_names.push_back(file.name);
      }
    }
    return new CompactionTask(
        db, this, cf_name, input_file_names,
        options_.num_levels - 1, compact_options_, false);
  }

  // Schedule the specified compaction task in background.
  void ScheduleCompaction(CompactionTask* task) override {
    options_.env->Schedule(&FullCompactor::CompactFiles, task);
  }

  static void CompactFiles(void* arg) {
    std::unique_ptr<CompactionTask> task(
        reinterpret_cast<CompactionTask*>(arg));
    assert(task);
    assert(task->db);
    Status s = task->db->CompactFiles(
        task->compact_options,
        task->input_file_names,
        task->output_level);
    printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
    if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
      // If a compaction task with its retry_on_fail=true failed,
      // try to schedule another compaction in case the reason
      // is not an IO error.
      CompactionTask* new_task = task->compactor->PickCompaction(
          task->db, task->column_family_name);
      task->compactor->ScheduleCompaction(new_task);
    }
  }

 private:
  Options options_;
  CompactionOptions compact_options_;
};

int main() {
  Options options;
  options.create_if_missing = true;
  // Disable RocksDB background compaction.
  options.compaction_style = kCompactionStyleNone;
  // Small slowdown and stop trigger for experimental purpose.
  options.level0_slowdown_writes_trigger = 3;
  options.level0_stop_writes_trigger = 5;
  options.IncreaseParallelism(5);
  options.listeners.emplace_back(new FullCompactor(options));

  DB* db = nullptr;
  DestroyDB(kDBPath, options);
  Status s = DB::Open(options, kDBPath, &db);
  assert(s.ok());
  assert(db);

  // if background compaction is not working, write will stall
  // because of options.level0_stop_writes_trigger
  for (int i = 1000; i < 99999; ++i) {
    db->Put(WriteOptions(), std::to_string(i),
                            std::string(500, 'a' + (i % 26)));
  }

  // verify the values are still there
  std::string value;
  for (int i = 1000; i < 99999; ++i) {
    db->Get(ReadOptions(), std::to_string(i),
                           &value);
    assert(value == std::string(500, 'a' + (i % 26)));
  }

  // close the db.
  delete db;

  return 0;
}
back to top