https://github.com/facebook/rocksdb
Raw File
Tip revision: ed4316166f67ec892603014634840d29f460f611 authored by Andrew Kryczka on 01 December 2020, 23:05:35 UTC
fixup release note (track_and_verify_wals_in_manifest not available in 6.14)
Tip revision: ed43161
db_sst_test.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_test_util.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/sst_file_manager.h"
#include "util/random.h"

namespace ROCKSDB_NAMESPACE {

class DBSSTTest : public DBTestBase {
 public:
  DBSSTTest() : DBTestBase("/db_sst_test", /*env_do_fsync=*/true) {}
};

#ifndef ROCKSDB_LITE
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
 public:
  FlushedFileCollector() {}
  ~FlushedFileCollector() override {}

  void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
    std::lock_guard<std::mutex> lock(mutex_);
    flushed_files_.push_back(info.file_path);
  }

  std::vector<std::string> GetFlushedFiles() {
    std::lock_guard<std::mutex> lock(mutex_);
    std::vector<std::string> result;
    for (auto fname : flushed_files_) {
      result.push_back(fname);
    }
    return result;
  }
  void ClearFlushedFiles() {
    std::lock_guard<std::mutex> lock(mutex_);
    flushed_files_.clear();
  }

 private:
  std::vector<std::string> flushed_files_;
  std::mutex mutex_;
};
#endif  // ROCKSDB_LITE

TEST_F(DBSSTTest, DontDeletePendingOutputs) {
  Options options;
  options.env = env_;
  options.create_if_missing = true;
  DestroyAndReopen(options);

  // Every time we write to a table file, call FOF/POF with full DB scan. This
  // will make sure our pending_outputs_ protection work correctly
  std::function<void()> purge_obsolete_files_function = [&]() {
    JobContext job_context(0);
    dbfull()->TEST_LockMutex();
    dbfull()->FindObsoleteFiles(&job_context, true /*force*/);
    dbfull()->TEST_UnlockMutex();
    dbfull()->PurgeObsoleteFiles(job_context);
    job_context.Clean();
  };

  env_->table_write_callback_ = &purge_obsolete_files_function;

  for (int i = 0; i < 2; ++i) {
    ASSERT_OK(Put("a", "begin"));
    ASSERT_OK(Put("z", "end"));
    ASSERT_OK(Flush());
  }

  // If pending output guard does not work correctly, PurgeObsoleteFiles() will
  // delete the file that Compaction is trying to create, causing this: error
  // db/db_test.cc:975: IO error:
  // /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory
  Compact("a", "b");
}

// 1 Create some SST files by inserting K-V pairs into DB
// 2 Close DB and change suffix from ".sst" to ".ldb" for every other SST file
// 3 Open DB and check if all key can be read
TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
  Options options = CurrentOptions();
  options.write_buffer_size = 110 << 10;  // 110KB
  options.num_levels = 4;
  DestroyAndReopen(options);

  Random rnd(301);
  int key_id = 0;
  for (int i = 0; i < 10; ++i) {
    GenerateNewFile(&rnd, &key_id, false);
  }
  Flush();
  Close();
  int const num_files = GetSstFileCount(dbname_);
  ASSERT_GT(num_files, 0);

  Reopen(options);
  std::vector<std::string> values;
  values.reserve(key_id);
  for (int k = 0; k < key_id; ++k) {
    values.push_back(Get(Key(k)));
  }
  Close();

  std::vector<std::string> filenames;
  GetSstFiles(env_, dbname_, &filenames);
  int num_ldb_files = 0;
  for (size_t i = 0; i < filenames.size(); ++i) {
    if (i & 1) {
      continue;
    }
    std::string const rdb_name = dbname_ + "/" + filenames[i];
    std::string const ldb_name = Rocks2LevelTableFileName(rdb_name);
    ASSERT_TRUE(env_->RenameFile(rdb_name, ldb_name).ok());
    ++num_ldb_files;
  }
  ASSERT_GT(num_ldb_files, 0);
  ASSERT_EQ(num_files, GetSstFileCount(dbname_));

  Reopen(options);
  for (int k = 0; k < key_id; ++k) {
    ASSERT_EQ(values[k], Get(Key(k)));
  }
  Destroy(options);
}

// Check that we don't crash when opening DB with
// DBOptions::skip_checking_sst_file_sizes_on_db_open = true.
TEST_F(DBSSTTest, SkipCheckingSSTFileSizesOnDBOpen) {
  ASSERT_OK(Put("pika", "choo"));
  ASSERT_OK(Flush());

  // Just open the DB with the option set to true and check that we don't crash.
  Options options;
  options.skip_checking_sst_file_sizes_on_db_open = true;
  Reopen(options);

  ASSERT_EQ("choo", Get("pika"));
}

#ifndef ROCKSDB_LITE
TEST_F(DBSSTTest, DontDeleteMovedFile) {
  // This test triggers move compaction and verifies that the file is not
  // deleted when it's part of move compaction
  Options options = CurrentOptions();
  options.env = env_;
  options.create_if_missing = true;
  options.max_bytes_for_level_base = 1024 * 1024;  // 1 MB
  options.level0_file_num_compaction_trigger =
      2;  // trigger compaction when we have 2 files
  DestroyAndReopen(options);

  Random rnd(301);
  // Create two 1MB sst files
  for (int i = 0; i < 2; ++i) {
    // Create 1MB sst file
    for (int j = 0; j < 100; ++j) {
      ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
    }
    ASSERT_OK(Flush());
  }
  // this should execute both L0->L1 and L1->(move)->L2 compactions
  dbfull()->TEST_WaitForCompact();
  ASSERT_EQ("0,0,1", FilesPerLevel(0));

  // If the moved file is actually deleted (the move-safeguard in
  // ~Version::Version() is not there), we get this failure:
  // Corruption: Can't access /000009.sst
  Reopen(options);
}

// This reproduces a bug where we don't delete a file because when it was
// supposed to be deleted, it was blocked by pending_outputs
// Consider:
// 1. current file_number is 13
// 2. compaction (1) starts, blocks deletion of all files starting with 13
// (pending outputs)
// 3. file 13 is created by compaction (2)
// 4. file 13 is consumed by compaction (3) and file 15 was created. Since file
// 13 has no references, it is put into VersionSet::obsolete_files_
// 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13
// is deleted from obsolete_files_ set.
// 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by
// pending outputs since compaction (1) is still running. It is not deleted and
// it is not present in obsolete_files_ anymore. Therefore, we never delete it.
TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
  Options options = CurrentOptions();
  options.env = env_;
  options.write_buffer_size = 2 * 1024 * 1024;     // 2 MB
  options.max_bytes_for_level_base = 1024 * 1024;  // 1 MB
  options.level0_file_num_compaction_trigger =
      2;  // trigger compaction when we have 2 files
  options.max_background_flushes = 2;
  options.max_background_compactions = 2;

  OnFileDeletionListener* listener = new OnFileDeletionListener();
  options.listeners.emplace_back(listener);

  Reopen(options);

  Random rnd(301);
  // Create two 1MB sst files
  for (int i = 0; i < 2; ++i) {
    // Create 1MB sst file
    for (int j = 0; j < 100; ++j) {
      ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
    }
    ASSERT_OK(Flush());
  }
  // this should execute both L0->L1 and L1->(move)->L2 compactions
  dbfull()->TEST_WaitForCompact();
  ASSERT_EQ("0,0,1", FilesPerLevel(0));

  test::SleepingBackgroundTask blocking_thread;
  port::Mutex mutex_;
  bool already_blocked(false);

  // block the flush
  std::function<void()> block_first_time = [&]() {
    bool blocking = false;
    {
      MutexLock l(&mutex_);
      if (!already_blocked) {
        blocking = true;
        already_blocked = true;
      }
    }
    if (blocking) {
      blocking_thread.DoSleep();
    }
  };
  env_->table_write_callback_ = &block_first_time;
  // Insert 2.5MB data, which should trigger a flush because we exceed
  // write_buffer_size. The flush will be blocked with block_first_time
  // pending_file is protecting all the files created after
  for (int j = 0; j < 256; ++j) {
    ASSERT_OK(Put(Key(j), rnd.RandomString(10 * 1024)));
  }
  blocking_thread.WaitUntilSleeping();

  ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr));

  ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
  std::vector<LiveFileMetaData> metadata;
  db_->GetLiveFilesMetaData(&metadata);
  ASSERT_EQ(metadata.size(), 1U);
  auto file_on_L2 = metadata[0].name;
  listener->SetExpectedFileName(dbname_ + file_on_L2);

  ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr,
                                        true /* disallow trivial move */));
  ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));

  // finish the flush!
  blocking_thread.WakeUp();
  blocking_thread.WaitUntilDone();
  dbfull()->TEST_WaitForFlushMemTable();
  // File just flushed is too big for L0 and L1 so gets moved to L2.
  dbfull()->TEST_WaitForCompact();
  ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0));

  metadata.clear();
  db_->GetLiveFilesMetaData(&metadata);
  ASSERT_EQ(metadata.size(), 2U);

  // This file should have been deleted during last compaction
  ASSERT_EQ(Status::NotFound(), env_->FileExists(dbname_ + file_on_L2));
  listener->VerifyMatchedCount(1);
}

TEST_F(DBSSTTest, DBWithSstFileManager) {
  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

  int files_added = 0;
  int files_deleted = 0;
  int files_moved = 0;
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "SstFileManagerImpl::OnAddFile", [&](void* /*arg*/) { files_added++; });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "SstFileManagerImpl::OnDeleteFile",
      [&](void* /*arg*/) { files_deleted++; });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  Options options = CurrentOptions();
  options.sst_file_manager = sst_file_manager;
  DestroyAndReopen(options);

  Random rnd(301);
  for (int i = 0; i < 25; i++) {
    GenerateNewRandomFile(&rnd);
    ASSERT_OK(Flush());
    dbfull()->TEST_WaitForFlushMemTable();
    dbfull()->TEST_WaitForCompact();
    // Verify that we are tracking all sst files in dbname_
    std::unordered_map<std::string, uint64_t> files_in_db;
    ASSERT_OK(GetAllSSTFiles(&files_in_db));
    ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  }
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

  std::unordered_map<std::string, uint64_t> files_in_db;
  ASSERT_OK(GetAllSSTFiles(&files_in_db));
  // Verify that we are tracking all sst files in dbname_
  ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  // Verify the total files size
  uint64_t total_files_size = 0;
  for (auto& file_to_size : files_in_db) {
    total_files_size += file_to_size.second;
  }
  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  // We flushed at least 25 files
  ASSERT_GE(files_added, 25);
  // Compaction must have deleted some files
  ASSERT_GT(files_deleted, 0);
  // No files were moved
  ASSERT_EQ(files_moved, 0);

  Close();
  Reopen(options);
  ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);

  // Verify that we track all the files again after the DB is closed and opened
  Close();
  sst_file_manager.reset(NewSstFileManager(env_));
  options.sst_file_manager = sst_file_manager;
  sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

  Reopen(options);
  ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBSSTTest, RateLimitedDelete) {
  Destroy(last_options_);
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
      {"DBSSTTest::RateLimitedDelete:1",
       "DeleteScheduler::BackgroundEmptyTrash"},
  });

  std::vector<uint64_t> penalties;
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DeleteScheduler::BackgroundEmptyTrash:Wait",
      [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
        // Turn timed wait into a simulated sleep
        uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
        uint64_t cur_time = env_->NowMicros();
        if (*abs_time_us > cur_time) {
          env_->MockSleepForMicroseconds(*abs_time_us - cur_time);
        }

        // Plus an additional short, random amount
        env_->MockSleepForMicroseconds(Random::GetTLSInstance()->Uniform(10));

        // Set wait until time to before (actual) current time to force not
        // to sleep
        *abs_time_us = Env::Default()->NowMicros();
      });

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  Options options = CurrentOptions();
  SetTimeElapseOnlySleepOnReopen(&options);
  options.disable_auto_compactions = true;
  options.env = env_;
  options.statistics = CreateDBStatistics();

  int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
  Status s;
  options.sst_file_manager.reset(
      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  ASSERT_OK(s);
  options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);

  WriteOptions wo;
  wo.disableWAL = true;
  ASSERT_OK(TryReopen(options));
  // Create 4 files in L0
  for (char v = 'a'; v <= 'd'; v++) {
    ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
    ASSERT_OK(Put("Key3", DummyString(1024, v), wo));
    ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
    ASSERT_OK(Put("Key1", DummyString(1024, v), wo));
    ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
    ASSERT_OK(Flush());
  }
  // We created 4 sst files in L0
  ASSERT_EQ("4", FilesPerLevel(0));

  std::vector<LiveFileMetaData> metadata;
  db_->GetLiveFilesMetaData(&metadata);

  // Compaction will move the 4 files in L0 to trash and create 1 L1 file
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
  ASSERT_EQ("0,1", FilesPerLevel(0));

  uint64_t delete_start_time = env_->NowMicros();
  // Hold BackgroundEmptyTrash
  TEST_SYNC_POINT("DBSSTTest::RateLimitedDelete:1");
  sfm->WaitForEmptyTrash();
  uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;

  uint64_t total_files_size = 0;
  uint64_t expected_penlty = 0;
  ASSERT_EQ(penalties.size(), metadata.size());
  for (size_t i = 0; i < metadata.size(); i++) {
    total_files_size += metadata[i].size;
    expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec);
    ASSERT_EQ(expected_penlty, penalties[i]);
  }
  ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
  ASSERT_LT(time_spent_deleting, expected_penlty * 1.1);
  ASSERT_EQ(4, options.statistics->getAndResetTickerCount(FILES_MARKED_TRASH));
  ASSERT_EQ(
      0, options.statistics->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBSSTTest, RateLimitedWALDelete) {
  Destroy(last_options_);

  std::vector<uint64_t> penalties;
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DeleteScheduler::BackgroundEmptyTrash:Wait",
      [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });

  Options options = CurrentOptions();
  options.disable_auto_compactions = true;
  options.compression = kNoCompression;
  options.env = env_;

  int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
  Status s;
  options.sst_file_manager.reset(
      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  ASSERT_OK(s);
  options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
  SetTimeElapseOnlySleepOnReopen(&options);

  ASSERT_OK(TryReopen(options));
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  // Create 4 files in L0
  for (char v = 'a'; v <= 'd'; v++) {
    ASSERT_OK(Put("Key2", DummyString(1024, v)));
    ASSERT_OK(Put("Key3", DummyString(1024, v)));
    ASSERT_OK(Put("Key4", DummyString(1024, v)));
    ASSERT_OK(Put("Key1", DummyString(1024, v)));
    ASSERT_OK(Put("Key4", DummyString(1024, v)));
    ASSERT_OK(Flush());
  }
  // We created 4 sst files in L0
  ASSERT_EQ("4", FilesPerLevel(0));

  // Compaction will move the 4 files in L0 to trash and create 1 L1 file
  CompactRangeOptions cro;
  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
  ASSERT_EQ("0,1", FilesPerLevel(0));

  sfm->WaitForEmptyTrash();
  ASSERT_EQ(penalties.size(), 8);

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

class DBWALTestWithParam
    : public DBSSTTest,
      public testing::WithParamInterface<std::tuple<std::string, bool>> {
 public:
  DBWALTestWithParam() {
    wal_dir_ = std::get<0>(GetParam());
    wal_dir_same_as_dbname_ = std::get<1>(GetParam());
  }

  std::string wal_dir_;
  bool wal_dir_same_as_dbname_;
};

TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
  class MyEnv : public EnvWrapper {
   public:
    MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}

    Status DeleteFile(const std::string& fname) {
      if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
        return Status::OK();
      }

      return target()->DeleteFile(fname);
    }

    void set_fake_log_delete(bool fake) { fake_log_delete = fake; }

   private:
    bool fake_log_delete;
  };

  std::unique_ptr<MyEnv> env(new MyEnv(Env::Default()));
  Destroy(last_options_);

  env->set_fake_log_delete(true);

  Options options = CurrentOptions();
  options.disable_auto_compactions = true;
  options.compression = kNoCompression;
  options.env = env.get();
  options.wal_dir = dbname_ + wal_dir_;

  int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
  Status s;
  options.sst_file_manager.reset(
      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  ASSERT_OK(s);
  options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);

  ASSERT_OK(TryReopen(options));

  // Create 4 files in L0
  for (char v = 'a'; v <= 'd'; v++) {
    ASSERT_OK(Put("Key2", DummyString(1024, v)));
    ASSERT_OK(Put("Key3", DummyString(1024, v)));
    ASSERT_OK(Put("Key4", DummyString(1024, v)));
    ASSERT_OK(Put("Key1", DummyString(1024, v)));
    ASSERT_OK(Put("Key4", DummyString(1024, v)));
    ASSERT_OK(Flush());
  }
  // We created 4 sst files in L0
  ASSERT_EQ("4", FilesPerLevel(0));

  Close();

  options.sst_file_manager.reset();
  std::vector<std::string> filenames;
  int trash_log_count = 0;
  if (!wal_dir_same_as_dbname_) {
    // Forcibly create some trash log files
    std::unique_ptr<WritableFile> result;
    env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
                         EnvOptions());
    result.reset();
  }
  env->GetChildren(options.wal_dir, &filenames);
  for (const std::string& fname : filenames) {
    if (fname.find(".log.trash") != std::string::npos) {
      trash_log_count++;
    }
  }
  ASSERT_GE(trash_log_count, 1);

  env->set_fake_log_delete(false);
  ASSERT_OK(TryReopen(options));

  filenames.clear();
  trash_log_count = 0;
  env->GetChildren(options.wal_dir, &filenames);
  for (const std::string& fname : filenames) {
    if (fname.find(".log.trash") != std::string::npos) {
      trash_log_count++;
    }
  }
  ASSERT_EQ(trash_log_count, 0);
  Close();
}

INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
                        ::testing::Values(std::make_tuple("", true),
                                          std::make_tuple("_wal_dir", false)));

TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
  Options options = CurrentOptions();

  options.sst_file_manager.reset(
      NewSstFileManager(env_, nullptr, "", 1024 * 1024 /* 1 MB/sec */));
  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());

  Destroy(last_options_);

  // Add some trash files to the db directory so the DB can clean them up
  env_->CreateDirIfMissing(dbname_);
  ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash"));
  ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash"));
  ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash"));

  // Reopen the DB and verify that it deletes existing trash files
  ASSERT_OK(TryReopen(options));
  sfm->WaitForEmptyTrash();
  ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
  ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
  ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
}


// Create a DB with 2 db_paths, and generate multiple files in the 2
// db_paths using CompactRangeOptions, make sure that files that were
// deleted from first db_path were deleted using DeleteScheduler and
// files in the second path were not.
TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
  std::atomic<int> bg_delete_file(0);
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DeleteScheduler::DeleteTrashFile:DeleteFile",
      [&](void* /*arg*/) { bg_delete_file++; });
  // The deletion scheduler sometimes skips marking file as trash according to
  // a heuristic. In that case the deletion will go through the below SyncPoint.
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { bg_delete_file++; });

  Options options = CurrentOptions();
  options.disable_auto_compactions = true;
  options.db_paths.emplace_back(dbname_, 1024 * 100);
  options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
  options.env = env_;

  int64_t rate_bytes_per_sec = 1024 * 1024;  // 1 Mb / Sec
  Status s;
  options.sst_file_manager.reset(
      NewSstFileManager(env_, nullptr, "", rate_bytes_per_sec, false, &s,
                        /* max_trash_db_ratio= */ 1.1));

  ASSERT_OK(s);
  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());

  DestroyAndReopen(options);
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  WriteOptions wo;
  wo.disableWAL = true;

  // Create 4 files in L0
  for (int i = 0; i < 4; i++) {
    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo));
    ASSERT_OK(Flush());
  }
  // We created 4 sst files in L0
  ASSERT_EQ("4", FilesPerLevel(0));
  // Compaction will delete files from L0 in first db path and generate a new
  // file in L1 in second db path
  CompactRangeOptions compact_options;
  compact_options.target_path_id = 1;
  Slice begin("Key0");
  Slice end("Key3");
  ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
  ASSERT_EQ("0,1", FilesPerLevel(0));

  // Create 4 files in L0
  for (int i = 4; i < 8; i++) {
    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo));
    ASSERT_OK(Flush());
  }
  ASSERT_EQ("4,1", FilesPerLevel(0));

  // Compaction will delete files from L0 in first db path and generate a new
  // file in L1 in second db path
  begin = "Key4";
  end = "Key7";
  ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
  ASSERT_EQ("0,2", FilesPerLevel(0));

  sfm->WaitForEmptyTrash();
  ASSERT_EQ(bg_delete_file, 8);

  // Compaction will delete both files and regenerate a file in L1 in second
  // db path. The deleted files should still be cleaned up via delete scheduler.
  compact_options.bottommost_level_compaction =
      BottommostLevelCompaction::kForceOptimized;
  ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
  ASSERT_EQ("0,1", FilesPerLevel(0));

  sfm->WaitForEmptyTrash();
  ASSERT_EQ(bg_delete_file, 10);

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
  int bg_delete_file = 0;
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DeleteScheduler::DeleteTrashFile:DeleteFile",
      [&](void* /*arg*/) { bg_delete_file++; });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  Status s;
  Options options = CurrentOptions();
  options.disable_auto_compactions = true;
  options.env = env_;
  options.sst_file_manager.reset(
      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  ASSERT_OK(s);
  DestroyAndReopen(options);

  // Create 4 files in L0
  for (int i = 0; i < 4; i++) {
    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
    ASSERT_OK(Flush());
  }
  // We created 4 sst files in L0
  ASSERT_EQ("4", FilesPerLevel(0));

  // Close DB and destroy it using DeleteScheduler
  Close();

  int num_sst_files = 0;
  int num_wal_files = 0;
  std::vector<std::string> db_files;
  env_->GetChildren(dbname_, &db_files);
  for (std::string f : db_files) {
    if (f.substr(f.find_last_of(".") + 1) == "sst") {
      num_sst_files++;
    } else if (f.substr(f.find_last_of(".") + 1) == "log") {
      num_wal_files++;
    }
  }
  ASSERT_GT(num_sst_files, 0);
  ASSERT_GT(num_wal_files, 0);

  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());

  sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
  sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
  ASSERT_OK(DestroyDB(dbname_, options));
  sfm->WaitForEmptyTrash();
  ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
}

TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

  Options options = CurrentOptions();
  options.sst_file_manager = sst_file_manager;
  options.disable_auto_compactions = true;
  DestroyAndReopen(options);

  Random rnd(301);

  // Generate a file containing 100 keys.
  for (int i = 0; i < 100; i++) {
    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  }
  ASSERT_OK(Flush());

  uint64_t first_file_size = 0;
  std::unordered_map<std::string, uint64_t> files_in_db;
  ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size));
  ASSERT_EQ(sfm->GetTotalSize(), first_file_size);

  // Set the maximum allowed space usage to the current total size
  sfm->SetMaxAllowedSpaceUsage(first_file_size + 1);

  ASSERT_OK(Put("key1", "val1"));
  // This flush will cause bg_error_ and will fail
  ASSERT_NOK(Flush());
}

TEST_F(DBSSTTest, CancellingCompactionsWorks) {
  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

  Options options = CurrentOptions();
  options.sst_file_manager = sst_file_manager;
  options.level0_file_num_compaction_trigger = 2;
  options.statistics = CreateDBStatistics();
  DestroyAndReopen(options);

  int completed_compactions = 0;
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* /*arg*/) {
        sfm->SetMaxAllowedSpaceUsage(0);
        ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
      });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
      [&](void* /*arg*/) { completed_compactions++; });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  Random rnd(301);

  // Generate a file containing 10 keys.
  for (int i = 0; i < 10; i++) {
    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  }
  ASSERT_OK(Flush());
  uint64_t total_file_size = 0;
  std::unordered_map<std::string, uint64_t> files_in_db;
  ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
  // Set the maximum allowed space usage to the current total size
  sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);

  // Generate another file to trigger compaction.
  for (int i = 0; i < 10; i++) {
    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  }
  ASSERT_OK(Flush());
  dbfull()->TEST_WaitForCompact(true);

  // Because we set a callback in CancelledCompaction, we actually
  // let the compaction run
  ASSERT_GT(completed_compactions, 0);
  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  // Make sure the stat is bumped
  ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0);
  ASSERT_EQ(0,
            dbfull()->immutable_db_options().statistics.get()->getTickerCount(
                FILES_MARKED_TRASH));
  ASSERT_EQ(4,
            dbfull()->immutable_db_options().statistics.get()->getTickerCount(
                FILES_DELETED_IMMEDIATELY));
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

  Options options = CurrentOptions();
  options.sst_file_manager = sst_file_manager;
  options.statistics = CreateDBStatistics();

  FlushedFileCollector* collector = new FlushedFileCollector();
  options.listeners.emplace_back(collector);

  DestroyAndReopen(options);

  Random rnd(301);

  // Generate a file containing 10 keys.
  for (int i = 0; i < 10; i++) {
    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  }
  ASSERT_OK(Flush());
  uint64_t total_file_size = 0;
  std::unordered_map<std::string, uint64_t> files_in_db;
  ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
  // Set the maximum allowed space usage to the current total size
  sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);

  // Generate another file to trigger compaction.
  for (int i = 0; i < 10; i++) {
    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  }
  ASSERT_OK(Flush());

  // OK, now trigger a manual compaction
  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);

  // Wait for manual compaction to get scheduled and finish
  dbfull()->TEST_WaitForCompact(true);

  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  // Make sure the stat is bumped
  ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
                COMPACTION_CANCELLED),
            1);

  // Now make sure CompactFiles also gets cancelled
  auto l0_files = collector->GetFlushedFiles();
  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);

  // Wait for manual compaction to get scheduled and finish
  dbfull()->TEST_WaitForCompact(true);

  ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
                COMPACTION_CANCELLED),
            2);
  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);

  // Now let the flush through and make sure GetCompactionsReservedSize
  // returns to normal
  sfm->SetMaxAllowedSpaceUsage(0);
  int completed_compactions = 0;
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; });

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
  dbfull()->TEST_WaitForCompact(true);

  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  ASSERT_GT(completed_compactions, 0);

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
  // This test will set a maximum allowed space for the DB, then it will
  // keep filling the DB until the limit is reached and bg_error_ is set.
  // When bg_error_ is set we will verify that the DB size is greater
  // than the limit.

  std::vector<int> max_space_limits_mbs = {1, 10};
  std::atomic<bool> bg_error_set(false);

  std::atomic<int> reached_max_space_on_flush(0);
  std::atomic<int> reached_max_space_on_compaction(0);
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
      [&](void* arg) {
        Status* bg_error = static_cast<Status*>(arg);
        bg_error_set = true;
        reached_max_space_on_flush++;
        // clear error to ensure compaction callback is called
        *bg_error = Status::OK();
      });

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
        bool* enough_room = static_cast<bool*>(arg);
        *enough_room = true;
      });

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
      [&](void* /*arg*/) {
        bg_error_set = true;
        reached_max_space_on_compaction++;
      });

  for (auto limit_mb : max_space_limits_mbs) {
    bg_error_set = false;
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
    std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
    auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

    Options options = CurrentOptions();
    options.sst_file_manager = sst_file_manager;
    options.write_buffer_size = 1024 * 512;  // 512 Kb
    DestroyAndReopen(options);
    Random rnd(301);

    sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);

    // It is easy to detect if the test is stuck in a loop. No need for
    // complex termination logic.
    while (true) {
      auto s = Put(rnd.RandomString(10), rnd.RandomString(50));
      if (!s.ok()) {
        break;
      }
    }
    ASSERT_TRUE(bg_error_set);
    uint64_t total_sst_files_size = 0;
    std::unordered_map<std::string, uint64_t> files_in_db;
    ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size));
    ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  }

  ASSERT_GT(reached_max_space_on_flush, 0);
  ASSERT_GT(reached_max_space_on_compaction, 0);
}

TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
  // Open DB with infinite max open files
  //  - First iteration use 1 thread to open files
  //  - Second iteration use 5 threads to open files
  for (int iter = 0; iter < 2; iter++) {
    Options options;
    options.create_if_missing = true;
    options.write_buffer_size = 100000;
    options.disable_auto_compactions = true;
    options.max_open_files = -1;
    if (iter == 0) {
      options.max_file_opening_threads = 1;
    } else {
      options.max_file_opening_threads = 5;
    }
    options = CurrentOptions(options);
    DestroyAndReopen(options);

    // Create 12 Files in L0 (then move then to L2)
    for (int i = 0; i < 12; i++) {
      std::string k = "L2_" + Key(i);
      ASSERT_OK(Put(k, k + std::string(1000, 'a')));
      ASSERT_OK(Flush());
    }
    CompactRangeOptions compact_options;
    compact_options.change_level = true;
    compact_options.target_level = 2;
    db_->CompactRange(compact_options, nullptr, nullptr);

    // Create 12 Files in L0
    for (int i = 0; i < 12; i++) {
      std::string k = "L0_" + Key(i);
      ASSERT_OK(Put(k, k + std::string(1000, 'a')));
      ASSERT_OK(Flush());
    }
    Close();

    // Reopening the DB will load all existing files
    Reopen(options);
    ASSERT_EQ("12,0,12", FilesPerLevel(0));
    std::vector<std::vector<FileMetaData>> files;
    dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);

    for (const auto& level : files) {
      for (const auto& file : level) {
        ASSERT_TRUE(file.table_reader_handle != nullptr);
      }
    }

    for (int i = 0; i < 12; i++) {
      ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a'));
      ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a'));
    }
  }
}

TEST_F(DBSSTTest, GetTotalSstFilesSize) {
  // We don't propagate oldest-key-time table property on compaction and
  // just write 0 as default value. This affect the exact table size, since
  // we encode table properties as varint64. Force time to be 0 to work around
  // it. Should remove the workaround after we propagate the property on
  // compaction.
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
      "FlushJob::WriteLevel0Table:oldest_ancester_time", [&](void* arg) {
        uint64_t* current_time = static_cast<uint64_t*>(arg);
        *current_time = 0;
      });
  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

  Options options = CurrentOptions();
  options.disable_auto_compactions = true;
  options.compression = kNoCompression;
  DestroyAndReopen(options);
  // Generate 5 files in L0
  for (int i = 0; i < 5; i++) {
    for (int j = 0; j < 10; j++) {
      std::string val = "val_file_" + ToString(i);
      ASSERT_OK(Put(Key(j), val));
    }
    Flush();
  }
  ASSERT_EQ("5", FilesPerLevel(0));

  std::vector<LiveFileMetaData> live_files_meta;
  dbfull()->GetLiveFilesMetaData(&live_files_meta);
  ASSERT_EQ(live_files_meta.size(), 5);
  uint64_t single_file_size = live_files_meta[0].size;

  uint64_t live_sst_files_size = 0;
  uint64_t total_sst_files_size = 0;
  for (const auto& file_meta : live_files_meta) {
    live_sst_files_size += file_meta.size;
  }

  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 5
  // Total SST files = 5
  ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
  ASSERT_EQ(total_sst_files_size, 5 * single_file_size);

  // hold current version
  std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));

  // Compact 5 files into 1 file in L0
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  ASSERT_EQ("0,1", FilesPerLevel(0));

  live_files_meta.clear();
  dbfull()->GetLiveFilesMetaData(&live_files_meta);
  ASSERT_EQ(live_files_meta.size(), 1);

  live_sst_files_size = 0;
  total_sst_files_size = 0;
  for (const auto& file_meta : live_files_meta) {
    live_sst_files_size += file_meta.size;
  }
  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 1 (compacted file)
  // Total SST files = 6 (5 original files + compacted file)
  ASSERT_EQ(live_sst_files_size, 1 * single_file_size);
  ASSERT_EQ(total_sst_files_size, 6 * single_file_size);

  // hold current version
  std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));

  // Delete all keys and compact, this will delete all live files
  for (int i = 0; i < 10; i++) {
    ASSERT_OK(Delete(Key(i)));
  }
  Flush();
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  ASSERT_EQ("", FilesPerLevel(0));

  live_files_meta.clear();
  dbfull()->GetLiveFilesMetaData(&live_files_meta);
  ASSERT_EQ(live_files_meta.size(), 0);

  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 0
  // Total SST files = 6 (5 original files + compacted file)
  ASSERT_EQ(total_sst_files_size, 6 * single_file_size);

  iter1.reset();
  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 0
  // Total SST files = 1 (compacted file)
  ASSERT_EQ(total_sst_files_size, 1 * single_file_size);

  iter2.reset();
  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 0
  // Total SST files = 0
  ASSERT_EQ(total_sst_files_size, 0);

  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
  Options options = CurrentOptions();
  options.disable_auto_compactions = true;
  options.compression = kNoCompression;
  DestroyAndReopen(options);
  // Generate 5 files in L0
  for (int i = 0; i < 5; i++) {
    ASSERT_OK(Put(Key(i), "val"));
    Flush();
  }
  ASSERT_EQ("5", FilesPerLevel(0));

  std::vector<LiveFileMetaData> live_files_meta;
  dbfull()->GetLiveFilesMetaData(&live_files_meta);
  ASSERT_EQ(live_files_meta.size(), 5);
  uint64_t single_file_size = live_files_meta[0].size;

  uint64_t live_sst_files_size = 0;
  uint64_t total_sst_files_size = 0;
  for (const auto& file_meta : live_files_meta) {
    live_sst_files_size += file_meta.size;
  }

  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));

  // Live SST files = 5
  // Total SST files = 5
  ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
  ASSERT_EQ(total_sst_files_size, 5 * single_file_size);

  // hold current version
  std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));

  // Compaction will do trivial move from L0 to L1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  ASSERT_EQ("0,5", FilesPerLevel(0));

  live_files_meta.clear();
  dbfull()->GetLiveFilesMetaData(&live_files_meta);
  ASSERT_EQ(live_files_meta.size(), 5);

  live_sst_files_size = 0;
  total_sst_files_size = 0;
  for (const auto& file_meta : live_files_meta) {
    live_sst_files_size += file_meta.size;
  }
  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 5
  // Total SST files = 5 (used in 2 version)
  ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
  ASSERT_EQ(total_sst_files_size, 5 * single_file_size);

  // hold current version
  std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));

  // Delete all keys and compact, this will delete all live files
  for (int i = 0; i < 5; i++) {
    ASSERT_OK(Delete(Key(i)));
  }
  Flush();
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  ASSERT_EQ("", FilesPerLevel(0));

  live_files_meta.clear();
  dbfull()->GetLiveFilesMetaData(&live_files_meta);
  ASSERT_EQ(live_files_meta.size(), 0);

  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 0
  // Total SST files = 5 (used in 2 version)
  ASSERT_EQ(total_sst_files_size, 5 * single_file_size);

  iter1.reset();
  iter2.reset();

  ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                       &total_sst_files_size));
  // Live SST files = 0
  // Total SST files = 0
  ASSERT_EQ(total_sst_files_size, 0);
}

#endif  // ROCKSDB_LITE

}  // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}
back to top