Revision df38c1ce660628f05b4686eeaf0b548295ce7967 authored by Mike Kolupaev on 22 April 2019, 15:17:45 UTC, committed by Facebook Github Bot on 22 April 2019, 15:20:35 UTC
Summary:
Introduce BlockBasedTableOptions::index_shortening to give users control on which key shortening techniques to be used in building index blocks. Before this patch, both separators and successor keys where shortened in indexes. With this patch, the default is set to kShortenSeparators to only shorten the separators. Since each index block has many separators and only one successor (last key), the change should not have negative impact on index block size. However it should prevent many unnecessary block loads where due to approximation introduced by shorted successor, seek would land us to the previous block and then fix it by moving to the next one.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5174

Differential Revision: D14884185

Pulled By: al13n321

fbshipit-source-id: 1b08bc8c03edcf09b6b8c16e9a7eea08ad4dd534
1 parent de76909
Raw File
env_librados_test.cc
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
//  Copyright (c) 2016, Red Hat, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#ifndef ROCKSDB_LITE

#include "rocksdb/utilities/env_librados.h"
#include <rados/librados.hpp>
#include "env/mock_env.h"
#include "util/testharness.h"

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "util/random.h"
#include <chrono>
#include <ostream>
#include "rocksdb/utilities/transaction_db.h"

class Timer {
  typedef std::chrono::high_resolution_clock high_resolution_clock;
  typedef std::chrono::milliseconds milliseconds;
public:
  explicit Timer(bool run = false)
  {
    if (run)
      Reset();
  }
  void Reset()
  {
    _start = high_resolution_clock::now();
  }
  milliseconds Elapsed() const
  {
    return std::chrono::duration_cast<milliseconds>(high_resolution_clock::now() - _start);
  }
  template <typename T, typename Traits>
  friend std::basic_ostream<T, Traits>& operator<<(std::basic_ostream<T, Traits>& out, const Timer& timer)
  {
    return out << timer.Elapsed().count();
  }
private:
  high_resolution_clock::time_point _start;
};

namespace rocksdb {

class EnvLibradosTest : public testing::Test {
public:
  // we will use all of these below
  const std::string db_name = "env_librados_test_db";
  const std::string db_pool = db_name + "_pool";
  const char *keyring = "admin";
  const char *config = "../ceph/src/ceph.conf";

  EnvLibrados* env_;
  const EnvOptions soptions_;

  EnvLibradosTest()
    : env_(new EnvLibrados(db_name, config, db_pool)) {
  }
  ~EnvLibradosTest() {
    delete env_;
    librados::Rados rados;
    int ret = 0;
    do {
      ret = rados.init("admin"); // just use the client.admin keyring
      if (ret < 0) { // let's handle any error that might have come back
        std::cerr << "couldn't initialize rados! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      ret = rados.conf_read_file(config);
      if (ret < 0) {
        // This could fail if the config file is malformed, but it'd be hard.
        std::cerr << "failed to parse config file " << config
                  << "! error" << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * next, we actually connect to the cluster
       */

      ret = rados.connect();
      if (ret < 0) {
        std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * And now we're done, so let's remove our pool and then
       * shut down the connection gracefully.
       */
      int delete_ret = rados.pool_delete(db_pool.c_str());
      if (delete_ret < 0) {
        // be careful not to
        std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
        ret = EXIT_FAILURE;
      }
    } while (0);
  }
};

TEST_F(EnvLibradosTest, Basics) {
  uint64_t file_size;
  std::unique_ptr<WritableFile> writable_file;
  std::vector<std::string> children;

  ASSERT_OK(env_->CreateDir("/dir"));
  // Check that the directory is empty.
  ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/non_existent"));
  ASSERT_TRUE(!env_->GetFileSize("/dir/non_existent", &file_size).ok());
  ASSERT_OK(env_->GetChildren("/dir", &children));
  ASSERT_EQ(0U, children.size());

  // Create a file.
  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
  writable_file.reset();

  // Check that the file exists.
  ASSERT_OK(env_->FileExists("/dir/f"));
  ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
  ASSERT_EQ(0U, file_size);
  ASSERT_OK(env_->GetChildren("/dir", &children));
  ASSERT_EQ(1U, children.size());
  ASSERT_EQ("f", children[0]);

  // Write to the file.
  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("abc"));
  writable_file.reset();


  // Check for expected size.
  ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
  ASSERT_EQ(3U, file_size);


  // Check that renaming works.
  ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
  ASSERT_OK(env_->RenameFile("/dir/f", "/dir/g"));
  ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/f"));
  ASSERT_OK(env_->FileExists("/dir/g"));
  ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
  ASSERT_EQ(3U, file_size);

  // Check that opening non-existent file fails.
  std::unique_ptr<SequentialFile> seq_file;
  std::unique_ptr<RandomAccessFile> rand_file;
  ASSERT_TRUE(
    !env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok());
  ASSERT_TRUE(!seq_file);
  ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
                                         soptions_).ok());
  ASSERT_TRUE(!rand_file);

  // Check that deleting works.
  ASSERT_TRUE(!env_->DeleteFile("/dir/non_existent").ok());
  ASSERT_OK(env_->DeleteFile("/dir/g"));
  ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/g"));
  ASSERT_OK(env_->GetChildren("/dir", &children));
  ASSERT_EQ(0U, children.size());
  ASSERT_OK(env_->DeleteDir("/dir"));
}

TEST_F(EnvLibradosTest, ReadWrite) {
  std::unique_ptr<WritableFile> writable_file;
  std::unique_ptr<SequentialFile> seq_file;
  std::unique_ptr<RandomAccessFile> rand_file;
  Slice result;
  char scratch[100];

  ASSERT_OK(env_->CreateDir("/dir"));

  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("hello "));
  ASSERT_OK(writable_file->Append("world"));
  writable_file.reset();

  // Read sequentially.
  ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
  ASSERT_OK(seq_file->Read(5, &result, scratch));  // Read "hello".
  ASSERT_EQ(0, result.compare("hello"));
  ASSERT_OK(seq_file->Skip(1));
  ASSERT_OK(seq_file->Read(1000, &result, scratch));  // Read "world".
  ASSERT_EQ(0, result.compare("world"));
  ASSERT_OK(seq_file->Read(1000, &result, scratch));  // Try reading past EOF.
  ASSERT_EQ(0U, result.size());
  ASSERT_OK(seq_file->Skip(100));  // Try to skip past end of file.
  ASSERT_OK(seq_file->Read(1000, &result, scratch));
  ASSERT_EQ(0U, result.size());

  // Random reads.
  ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_));
  ASSERT_OK(rand_file->Read(6, 5, &result, scratch));  // Read "world".
  ASSERT_EQ(0, result.compare("world"));
  ASSERT_OK(rand_file->Read(0, 5, &result, scratch));  // Read "hello".
  ASSERT_EQ(0, result.compare("hello"));
  ASSERT_OK(rand_file->Read(10, 100, &result, scratch));  // Read "d".
  ASSERT_EQ(0, result.compare("d"));

  // Too high offset.
  ASSERT_OK(rand_file->Read(1000, 5, &result, scratch));
}

TEST_F(EnvLibradosTest, Locks) {
  FileLock* lock = nullptr;
  std::unique_ptr<WritableFile> writable_file;

  ASSERT_OK(env_->CreateDir("/dir"));

  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));

  // These are no-ops, but we test they return success.
  ASSERT_OK(env_->LockFile("some file", &lock));
  ASSERT_OK(env_->UnlockFile(lock));

  ASSERT_OK(env_->LockFile("/dir/f", &lock));
  ASSERT_OK(env_->UnlockFile(lock));
}

TEST_F(EnvLibradosTest, Misc) {
  std::string test_dir;
  ASSERT_OK(env_->GetTestDirectory(&test_dir));
  ASSERT_TRUE(!test_dir.empty());

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_TRUE(!env_->NewWritableFile("/a/b", &writable_file, soptions_).ok());

  ASSERT_OK(env_->NewWritableFile("/a", &writable_file, soptions_));
  // These are no-ops, but we test they return success.
  ASSERT_OK(writable_file->Sync());
  ASSERT_OK(writable_file->Flush());
  ASSERT_OK(writable_file->Close());
  writable_file.reset();
}

TEST_F(EnvLibradosTest, LargeWrite) {
  const size_t kWriteSize = 300 * 1024;
  char* scratch = new char[kWriteSize * 2];

  std::string write_data;
  for (size_t i = 0; i < kWriteSize; ++i) {
    write_data.append(1, 'h');
  }

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_OK(env_->CreateDir("/dir"));
  ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("foo"));
  ASSERT_OK(writable_file->Append(write_data));
  writable_file.reset();

  std::unique_ptr<SequentialFile> seq_file;
  Slice result;
  ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
  ASSERT_OK(seq_file->Read(3, &result, scratch));  // Read "foo".
  ASSERT_EQ(0, result.compare("foo"));

  size_t read = 0;
  std::string read_data;
  while (read < kWriteSize) {
    ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
    read_data.append(result.data(), result.size());
    read += result.size();
  }
  ASSERT_TRUE(write_data == read_data);
  delete[] scratch;
}

TEST_F(EnvLibradosTest, FrequentlySmallWrite) {
  const size_t kWriteSize = 1 << 10;
  char* scratch = new char[kWriteSize * 2];

  std::string write_data;
  for (size_t i = 0; i < kWriteSize; ++i) {
    write_data.append(1, 'h');
  }

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_OK(env_->CreateDir("/dir"));
  ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("foo"));

  for (size_t i = 0; i < kWriteSize; ++i) {
    ASSERT_OK(writable_file->Append("h"));
  }
  writable_file.reset();

  std::unique_ptr<SequentialFile> seq_file;
  Slice result;
  ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
  ASSERT_OK(seq_file->Read(3, &result, scratch));  // Read "foo".
  ASSERT_EQ(0, result.compare("foo"));

  size_t read = 0;
  std::string read_data;
  while (read < kWriteSize) {
    ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
    read_data.append(result.data(), result.size());
    read += result.size();
  }
  ASSERT_TRUE(write_data == read_data);
  delete[] scratch;
}

TEST_F(EnvLibradosTest, Truncate) {
  const size_t kWriteSize = 300 * 1024;
  const size_t truncSize = 1024;
  std::string write_data;
  for (size_t i = 0; i < kWriteSize; ++i) {
    write_data.append(1, 'h');
  }

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_OK(env_->CreateDir("/dir"));
  ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append(write_data));
  ASSERT_EQ(writable_file->GetFileSize(), kWriteSize);
  ASSERT_OK(writable_file->Truncate(truncSize));
  ASSERT_EQ(writable_file->GetFileSize(), truncSize);
  writable_file.reset();
}

TEST_F(EnvLibradosTest, DBBasics) {
  std::string kDBPath = "/tmp/DBBasics";
  DB* db;
  Options options;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options.IncreaseParallelism();
  options.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options.create_if_missing = true;
  options.env = env_;

  // open DB
  Status s = DB::Open(options, kDBPath, &db);
  assert(s.ok());

  // Put key-value
  s = db->Put(WriteOptions(), "key1", "value");
  assert(s.ok());
  std::string value;
  // get value
  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.ok());
  assert(value == "value");

  // atomically apply a set of updates
  {
    WriteBatch batch;
    batch.Delete("key1");
    batch.Put("key2", value);
    s = db->Write(WriteOptions(), &batch);
  }

  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.IsNotFound());

  db->Get(ReadOptions(), "key2", &value);
  assert(value == "value");

  delete db;
}

TEST_F(EnvLibradosTest, DBLoadKeysInRandomOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 10;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBLoadKeysInRandomOrder1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  rocksdb::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    snprintf(key,
             20,
             "%16lx",
             (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
    snprintf(value,
             20,
             "%16lx",
             (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
    // Put key-value
    s1 = db1->Put(WriteOptions(), key, value);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBLoadKeysInRandomOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  rocksdb::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    snprintf(key,
             20,
             "%16lx",
             (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
    snprintf(value,
             20,
             "%16lx",
             (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
    // Put key-value
    s2 = db2->Put(WriteOptions(), key, value);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosTest, DBBulkLoadKeysInRandomOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 15;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  rocksdb::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  rocksdb::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosTest, DBBulkLoadKeysInSequentialOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 15;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBBulkLoadKeysInSequentialOrder1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  rocksdb::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%019lld",
               (long long)(i * bulk_size + j));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBBulkLoadKeysInSequentialOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  rocksdb::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosTest, DBRandomRead) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 10;
  int read_loop = 1 << 20;
  Timer timer(false);
  std::cout << "Test size : keys_num(" << max_loop << ", " << bulk_size << "); read_loop(" << read_loop << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBRandomRead1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  rocksdb::Random64 r1(time(nullptr));


  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%019lld",
               (long long)(i * bulk_size + j));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  timer.Reset();
  int base1 = 0, offset1 = 0;
  for (int i = 0; i < read_loop; ++i) {
    base1 = r1.Uniform(max_loop);
    offset1 = r1.Uniform(bulk_size);
    std::string value1;
    snprintf(key,
             20,
             "%019lld",
             (long long)(base1 * bulk_size + offset1));
    s1 = db1->Get(ReadOptions(), key, &value1);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBRandomRead2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  rocksdb::Random64 r2(time(nullptr));

  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%019lld",
               (long long)(i * bulk_size + j));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }

  timer.Reset();
  int base2 = 0, offset2 = 0;
  for (int i = 0; i < read_loop; ++i) {
    base2 = r2.Uniform(max_loop);
    offset2 = r2.Uniform(bulk_size);
    std::string value2;
    snprintf(key,
             20,
             "%019lld",
             (long long)(base2 * bulk_size + offset2));
    s2 = db2->Get(ReadOptions(), key, &value2);
    if (!s2.ok()) {
      std::cout << s2.ToString() << std::endl;
    }
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

class EnvLibradosMutipoolTest : public testing::Test {
public:
  // we will use all of these below
  const std::string client_name = "client.admin";
  const std::string cluster_name = "ceph";
  const uint64_t flags = 0;
  const std::string db_name = "env_librados_test_db";
  const std::string db_pool = db_name + "_pool";
  const std::string wal_dir = "/wal";
  const std::string wal_pool = db_name + "_wal_pool";
  const size_t write_buffer_size = 1 << 20;
  const char *keyring = "admin";
  const char *config = "../ceph/src/ceph.conf";

  EnvLibrados* env_;
  const EnvOptions soptions_;

  EnvLibradosMutipoolTest() {
    env_ = new EnvLibrados(client_name, cluster_name, flags, db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size);
  }
  ~EnvLibradosMutipoolTest() {
    delete env_;
    librados::Rados rados;
    int ret = 0;
    do {
      ret = rados.init("admin"); // just use the client.admin keyring
      if (ret < 0) { // let's handle any error that might have come back
        std::cerr << "couldn't initialize rados! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      ret = rados.conf_read_file(config);
      if (ret < 0) {
        // This could fail if the config file is malformed, but it'd be hard.
        std::cerr << "failed to parse config file " << config
                  << "! error" << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * next, we actually connect to the cluster
       */

      ret = rados.connect();
      if (ret < 0) {
        std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * And now we're done, so let's remove our pool and then
       * shut down the connection gracefully.
       */
      int delete_ret = rados.pool_delete(db_pool.c_str());
      if (delete_ret < 0) {
        // be careful not to
        std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
        ret = EXIT_FAILURE;
      }
      delete_ret = rados.pool_delete(wal_pool.c_str());
      if (delete_ret < 0) {
        // be careful not to
        std::cerr << "We failed to delete our test pool!" << wal_pool << delete_ret << std::endl;
        ret = EXIT_FAILURE;
      }
    } while (0);
  }
};

TEST_F(EnvLibradosMutipoolTest, Basics) {
  uint64_t file_size;
  std::unique_ptr<WritableFile> writable_file;
  std::vector<std::string> children;
  std::vector<std::string> v = {"/tmp/dir1", "/tmp/dir2", "/tmp/dir3", "/tmp/dir4", "dir"};

  for (size_t i = 0; i < v.size(); ++i) {
    std::string dir = v[i];
    std::string dir_non_existent = dir + "/non_existent";
    std::string dir_f = dir + "/f";
    std::string dir_g = dir + "/g";

    ASSERT_OK(env_->CreateDir(dir.c_str()));
    // Check that the directory is empty.
    ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_non_existent.c_str()));
    ASSERT_TRUE(!env_->GetFileSize(dir_non_existent.c_str(), &file_size).ok());
    ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
    ASSERT_EQ(0U, children.size());

    // Create a file.
    ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
    writable_file.reset();

    // Check that the file exists.
    ASSERT_OK(env_->FileExists(dir_f.c_str()));
    ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
    ASSERT_EQ(0U, file_size);
    ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
    ASSERT_EQ(1U, children.size());
    ASSERT_EQ("f", children[0]);

    // Write to the file.
    ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
    ASSERT_OK(writable_file->Append("abc"));
    writable_file.reset();


    // Check for expected size.
    ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
    ASSERT_EQ(3U, file_size);


    // Check that renaming works.
    ASSERT_TRUE(!env_->RenameFile(dir_non_existent.c_str(), dir_g.c_str()).ok());
    ASSERT_OK(env_->RenameFile(dir_f.c_str(), dir_g.c_str()));
    ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_f.c_str()));
    ASSERT_OK(env_->FileExists(dir_g.c_str()));
    ASSERT_OK(env_->GetFileSize(dir_g.c_str(), &file_size));
    ASSERT_EQ(3U, file_size);

    // Check that opening non-existent file fails.
    std::unique_ptr<SequentialFile> seq_file;
    std::unique_ptr<RandomAccessFile> rand_file;
    ASSERT_TRUE(
      !env_->NewSequentialFile(dir_non_existent.c_str(), &seq_file, soptions_).ok());
    ASSERT_TRUE(!seq_file);
    ASSERT_TRUE(!env_->NewRandomAccessFile(dir_non_existent.c_str(), &rand_file,
                                           soptions_).ok());
    ASSERT_TRUE(!rand_file);

    // Check that deleting works.
    ASSERT_TRUE(!env_->DeleteFile(dir_non_existent.c_str()).ok());
    ASSERT_OK(env_->DeleteFile(dir_g.c_str()));
    ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_g.c_str()));
    ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
    ASSERT_EQ(0U, children.size());
    ASSERT_OK(env_->DeleteDir(dir.c_str()));
  }
}

TEST_F(EnvLibradosMutipoolTest, DBBasics) {
  std::string kDBPath = "/tmp/DBBasics";
  std::string walPath = "/tmp/wal";
  DB* db;
  Options options;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options.IncreaseParallelism();
  options.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options.create_if_missing = true;
  options.env = env_;
  options.wal_dir = walPath;

  // open DB
  Status s = DB::Open(options, kDBPath, &db);
  assert(s.ok());

  // Put key-value
  s = db->Put(WriteOptions(), "key1", "value");
  assert(s.ok());
  std::string value;
  // get value
  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.ok());
  assert(value == "value");

  // atomically apply a set of updates
  {
    WriteBatch batch;
    batch.Delete("key1");
    batch.Put("key2", value);
    s = db->Write(WriteOptions(), &batch);
  }

  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.IsNotFound());

  db->Get(ReadOptions(), "key2", &value);
  assert(value == "value");

  delete db;
}

TEST_F(EnvLibradosMutipoolTest, DBBulkLoadKeysInRandomOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 15;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
  std::string walPath = "/tmp/wal";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  rocksdb::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;
  options2.wal_dir = walPath;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  if (!s2.ok()) {
    std::cerr << s2.ToString() << std::endl;
  }
  assert(s2.ok());

  rocksdb::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) {
  std::string kDBPath = "/tmp/DBTransactionDB";
  // open DB
  Options options;
  TransactionDBOptions txn_db_options;
  options.create_if_missing = true;
  options.env = env_;
  TransactionDB* txn_db;

  Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
  assert(s.ok());

  WriteOptions write_options;
  ReadOptions read_options;
  TransactionOptions txn_options;
  std::string value;

  ////////////////////////////////////////////////////////
  //
  // Simple OptimisticTransaction Example ("Read Committed")
  //
  ////////////////////////////////////////////////////////

  // Start a transaction
  Transaction* txn = txn_db->BeginTransaction(write_options);
  assert(txn);

  // Read a key in this transaction
  s = txn->Get(read_options, "abc", &value);
  assert(s.IsNotFound());

  // Write a key in this transaction
  s = txn->Put("abc", "def");
  assert(s.ok());

  // Read a key OUTSIDE this transaction. Does not affect txn.
  s = txn_db->Get(read_options, "abc", &value);

  // Write a key OUTSIDE of this transaction.
  // Does not affect txn since this is an unrelated key.  If we wrote key 'abc'
  // here, the transaction would fail to commit.
  s = txn_db->Put(write_options, "xyz", "zzz");

  // Commit transaction
  s = txn->Commit();
  assert(s.ok());
  delete txn;

  ////////////////////////////////////////////////////////
  //
  // "Repeatable Read" (Snapshot Isolation) Example
  //   -- Using a single Snapshot
  //
  ////////////////////////////////////////////////////////

  // Set a snapshot at start of transaction by setting set_snapshot=true
  txn_options.set_snapshot = true;
  txn = txn_db->BeginTransaction(write_options, txn_options);

  const Snapshot* snapshot = txn->GetSnapshot();

  // Write a key OUTSIDE of transaction
  s = txn_db->Put(write_options, "abc", "xyz");
  assert(s.ok());

  // Attempt to read a key using the snapshot.  This will fail since
  // the previous write outside this txn conflicts with this read.
  read_options.snapshot = snapshot;
  s = txn->GetForUpdate(read_options, "abc", &value);
  assert(s.IsBusy());

  txn->Rollback();

  delete txn;
  // Clear snapshot from read options since it is no longer valid
  read_options.snapshot = nullptr;
  snapshot = nullptr;

  ////////////////////////////////////////////////////////
  //
  // "Read Committed" (Monotonic Atomic Views) Example
  //   --Using multiple Snapshots
  //
  ////////////////////////////////////////////////////////

  // In this example, we set the snapshot multiple times.  This is probably
  // only necessary if you have very strict isolation requirements to
  // implement.

  // Set a snapshot at start of transaction
  txn_options.set_snapshot = true;
  txn = txn_db->BeginTransaction(write_options, txn_options);

  // Do some reads and writes to key "x"
  read_options.snapshot = txn_db->GetSnapshot();
  s = txn->Get(read_options, "x", &value);
  txn->Put("x", "x");

  // Do a write outside of the transaction to key "y"
  s = txn_db->Put(write_options, "y", "y");

  // Set a new snapshot in the transaction
  txn->SetSnapshot();
  txn->SetSavePoint();
  read_options.snapshot = txn_db->GetSnapshot();

  // Do some reads and writes to key "y"
  // Since the snapshot was advanced, the write done outside of the
  // transaction does not conflict.
  s = txn->GetForUpdate(read_options, "y", &value);
  txn->Put("y", "y");

  // Decide we want to revert the last write from this transaction.
  txn->RollbackToSavePoint();

  // Commit.
  s = txn->Commit();
  assert(s.ok());
  delete txn;
  // Clear snapshot from read options since it is no longer valid
  read_options.snapshot = nullptr;

  // Cleanup
  delete txn_db;
  DestroyDB(kDBPath, options);
}

}  // namespace rocksdb

int main(int argc, char** argv) {
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

#else
#include <stdio.h>

int main(int argc, char** argv) {
  fprintf(stderr, "SKIPPED as EnvMirror is not supported in ROCKSDB_LITE\n");
  return 0;
}

#endif  // !ROCKSDB_LITE
back to top