Revision cff0d1e8e68e6360328fa39e01e8566747185947 authored by Peter Dillinger on 18 March 2022, 18:06:17 UTC, committed by Facebook GitHub Bot on 18 March 2022, 18:06:17 UTC
Summary:
The primary goal of this change is to add support for backing up and
restoring (applying on restore) file temperature metadata, without
committing to either the DB manifest or the FS reported "current"
temperatures being exclusive "source of truth".

To achieve this goal, we need to add temperature information to backup
metadata, which requires updated backup meta schema. Fortunately I
prepared for this in https://github.com/facebook/rocksdb/issues/8069, which began forward compatibility in version
6.19.0 for this kind of schema update. (Previously, backup meta schema
was not extensible! Making this schema update public will allow some
other "nice to have" features like taking backups with hard links, and
avoiding crc32c checksum computation when another checksum is already
available.) While schema version 2 is newly public, the default schema
version is still 1. Until we change the default, users will need to set
to 2 to enable features like temperature data backup+restore. New
metadata like temperature information will be ignored with a warning
in versions before this change and since 6.19.0. The metadata is
considered ignorable because a functioning DB can be restored without
it.

Some detail:
* Some renaming because "future schema" is now just public schema 2.
* Initialize some atomics in TestFs (linter reported)
* Add temperature hint support to SstFileDumper (used by BackupEngine)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9660

Test Plan:
related unit test majorly updated for the new functionality,
including some shared testing support for tracking temperatures in a FS.

Some other tests and testing hooks into production code also updated for
making the backup meta schema change public.

Reviewed By: ajkr

Differential Revision: D34686968

Pulled By: pdillinger

fbshipit-source-id: 3ac1fa3e67ee97ca8a5103d79cc87d872c1d862a
1 parent 3bdbf67
Raw File
mock_env.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "env/mock_env.h"

#include <algorithm>
#include <chrono>

#include "env/emulated_clock.h"
#include "file/filename.h"
#include "port/sys_time.h"
#include "rocksdb/file_system.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/hash.h"
#include "util/random.h"
#include "util/rate_limiter.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
namespace {
int64_t MaybeCurrentTime(const std::shared_ptr<SystemClock>& clock) {
  int64_t time = 1337346000;  // arbitrary fallback default
  clock->GetCurrentTime(&time).PermitUncheckedError();
  return time;
}

static std::unordered_map<std::string, OptionTypeInfo> time_elapse_type_info = {
#ifndef ROCKSDB_LITE
    {"time_elapse_only_sleep",
     {0, OptionType::kBoolean, OptionVerificationType::kNormal,
      OptionTypeFlags::kCompareNever,
      [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
         const std::string& value, void* addr) {
        auto clock = static_cast<EmulatedSystemClock*>(addr);
        clock->SetTimeElapseOnlySleep(ParseBoolean("", value));
        return Status::OK();
      },
      [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
         const void* addr, std::string* value) {
        const auto clock = static_cast<const EmulatedSystemClock*>(addr);
        *value = clock->IsTimeElapseOnlySleep() ? "true" : "false";
        return Status::OK();
      },
      nullptr}},
#endif  // ROCKSDB_LITE
};
static std::unordered_map<std::string, OptionTypeInfo> mock_sleep_type_info = {
#ifndef ROCKSDB_LITE
    {"mock_sleep",
     {0, OptionType::kBoolean, OptionVerificationType::kNormal,
      OptionTypeFlags::kCompareNever,
      [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
         const std::string& value, void* addr) {
        auto clock = static_cast<EmulatedSystemClock*>(addr);
        clock->SetMockSleep(ParseBoolean("", value));
        return Status::OK();
      },
      [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
         const void* addr, std::string* value) {
        const auto clock = static_cast<const EmulatedSystemClock*>(addr);
        *value = clock->IsMockSleepEnabled() ? "true" : "false";
        return Status::OK();
      },
      nullptr}},
#endif  // ROCKSDB_LITE
};
}  // namespace

EmulatedSystemClock::EmulatedSystemClock(
    const std::shared_ptr<SystemClock>& base, bool time_elapse_only_sleep)
    : SystemClockWrapper(base),
      maybe_starting_time_(MaybeCurrentTime(base)),
      time_elapse_only_sleep_(time_elapse_only_sleep),
      no_slowdown_(time_elapse_only_sleep) {
  RegisterOptions("", this, &time_elapse_type_info);
  RegisterOptions("", this, &mock_sleep_type_info);
}

class MemFile {
 public:
  explicit MemFile(SystemClock* clock, const std::string& fn,
                   bool _is_lock_file = false)
      : clock_(clock),
        fn_(fn),
        refs_(0),
        is_lock_file_(_is_lock_file),
        locked_(false),
        size_(0),
        modified_time_(Now()),
        rnd_(Lower32of64(GetSliceNPHash64(fn))),
        fsynced_bytes_(0) {}
  // No copying allowed.
  MemFile(const MemFile&) = delete;
  void operator=(const MemFile&) = delete;

  void Ref() {
    MutexLock lock(&mutex_);
    ++refs_;
  }

  bool is_lock_file() const { return is_lock_file_; }

  bool Lock() {
    assert(is_lock_file_);
    MutexLock lock(&mutex_);
    if (locked_) {
      return false;
    } else {
      locked_ = true;
      return true;
    }
  }

  void Unlock() {
    assert(is_lock_file_);
    MutexLock lock(&mutex_);
    locked_ = false;
  }

  void Unref() {
    bool do_delete = false;
    {
      MutexLock lock(&mutex_);
      --refs_;
      assert(refs_ >= 0);
      if (refs_ <= 0) {
        do_delete = true;
      }
    }

    if (do_delete) {
      delete this;
    }
  }

  uint64_t Size() const { return size_; }

  void Truncate(size_t size, const IOOptions& /*options*/,
                IODebugContext* /*dbg*/) {
    MutexLock lock(&mutex_);
    if (size < size_) {
      data_.resize(size);
      size_ = size;
    }
  }

  void CorruptBuffer() {
    if (fsynced_bytes_ >= size_) {
      return;
    }
    uint64_t buffered_bytes = size_ - fsynced_bytes_;
    uint64_t start =
        fsynced_bytes_ + rnd_.Uniform(static_cast<int>(buffered_bytes));
    uint64_t end = std::min(start + 512, size_.load());
    MutexLock lock(&mutex_);
    for (uint64_t pos = start; pos < end; ++pos) {
      data_[static_cast<size_t>(pos)] = static_cast<char>(rnd_.Uniform(256));
    }
  }

  IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
                Slice* result, char* scratch, IODebugContext* /*dbg*/) const {
    {
      IOStatus s;
      TEST_SYNC_POINT_CALLBACK("MemFile::Read:IOStatus", &s);
      if (!s.ok()) {
        // with sync point only
        *result = Slice();
        return s;
      }
    }
    MutexLock lock(&mutex_);
    const uint64_t available = Size() - std::min(Size(), offset);
    size_t offset_ = static_cast<size_t>(offset);
    if (n > available) {
      n = static_cast<size_t>(available);
    }
    if (n == 0) {
      *result = Slice();
      return IOStatus::OK();
    }
    if (scratch) {
      memcpy(scratch, &(data_[offset_]), n);
      *result = Slice(scratch, n);
    } else {
      *result = Slice(&(data_[offset_]), n);
    }
    return IOStatus::OK();
  }

  IOStatus Write(uint64_t offset, const Slice& data,
                 const IOOptions& /*options*/, IODebugContext* /*dbg*/) {
    MutexLock lock(&mutex_);
    size_t offset_ = static_cast<size_t>(offset);
    if (offset + data.size() > data_.size()) {
      data_.resize(offset_ + data.size());
    }
    data_.replace(offset_, data.size(), data.data(), data.size());
    size_ = data_.size();
    modified_time_ = Now();
    return IOStatus::OK();
  }

  IOStatus Append(const Slice& data, const IOOptions& /*options*/,
                  IODebugContext* /*dbg*/) {
    MutexLock lock(&mutex_);
    data_.append(data.data(), data.size());
    size_ = data_.size();
    modified_time_ = Now();
    return IOStatus::OK();
  }

  IOStatus Fsync(const IOOptions& /*options*/, IODebugContext* /*dbg*/) {
    fsynced_bytes_ = size_.load();
    return IOStatus::OK();
  }

  uint64_t ModifiedTime() const { return modified_time_; }

 private:
  uint64_t Now() {
    int64_t unix_time = 0;
    auto s = clock_->GetCurrentTime(&unix_time);
    assert(s.ok());
    return static_cast<uint64_t>(unix_time);
  }

  // Private since only Unref() should be used to delete it.
  ~MemFile() { assert(refs_ == 0); }

  SystemClock* clock_;
  const std::string fn_;
  mutable port::Mutex mutex_;
  int refs_;
  bool is_lock_file_;
  bool locked_;

  // Data written into this file, all bytes before fsynced_bytes are
  // persistent.
  std::string data_;
  std::atomic<uint64_t> size_;
  std::atomic<uint64_t> modified_time_;

  Random rnd_;
  std::atomic<uint64_t> fsynced_bytes_;
};

namespace {

class MockSequentialFile : public FSSequentialFile {
 public:
  explicit MockSequentialFile(MemFile* file, const FileOptions& opts)
      : file_(file),
        use_direct_io_(opts.use_direct_reads),
        use_mmap_read_(opts.use_mmap_reads),
        pos_(0) {
    file_->Ref();
  }

  ~MockSequentialFile() override { file_->Unref(); }

  IOStatus Read(size_t n, const IOOptions& options, Slice* result,
                char* scratch, IODebugContext* dbg) override {
    IOStatus s = file_->Read(pos_, n, options, result,
                             (use_mmap_read_) ? nullptr : scratch, dbg);
    if (s.ok()) {
      pos_ += result->size();
    }
    return s;
  }

  bool use_direct_io() const override { return use_direct_io_; }
  IOStatus Skip(uint64_t n) override {
    if (pos_ > file_->Size()) {
      return IOStatus::IOError("pos_ > file_->Size()");
    }
    const uint64_t available = file_->Size() - pos_;
    if (n > available) {
      n = available;
    }
    pos_ += static_cast<size_t>(n);
    return IOStatus::OK();
  }

 private:
  MemFile* file_;
  bool use_direct_io_;
  bool use_mmap_read_;
  size_t pos_;
};

class MockRandomAccessFile : public FSRandomAccessFile {
 public:
  explicit MockRandomAccessFile(MemFile* file, const FileOptions& opts)
      : file_(file),
        use_direct_io_(opts.use_direct_reads),
        use_mmap_read_(opts.use_mmap_reads) {
    file_->Ref();
  }

  ~MockRandomAccessFile() override { file_->Unref(); }

  bool use_direct_io() const override { return use_direct_io_; }

  IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
                    const IOOptions& /*options*/,
                    IODebugContext* /*dbg*/) override {
    return IOStatus::OK();
  }

  IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
                Slice* result, char* scratch,
                IODebugContext* dbg) const override {
    if (use_mmap_read_) {
      return file_->Read(offset, n, options, result, nullptr, dbg);
    } else {
      return file_->Read(offset, n, options, result, scratch, dbg);
    }
  }

 private:
  MemFile* file_;
  bool use_direct_io_;
  bool use_mmap_read_;
};

class MockRandomRWFile : public FSRandomRWFile {
 public:
  explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); }

  ~MockRandomRWFile() override { file_->Unref(); }

  IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
                 IODebugContext* dbg) override {
    return file_->Write(offset, data, options, dbg);
  }

  IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
                Slice* result, char* scratch,
                IODebugContext* dbg) const override {
    return file_->Read(offset, n, options, result, scratch, dbg);
  }

  IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
    return file_->Fsync(options, dbg);
  }

  IOStatus Flush(const IOOptions& /*options*/,
                 IODebugContext* /*dbg*/) override {
    return IOStatus::OK();
  }

  IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
    return file_->Fsync(options, dbg);
  }

 private:
  MemFile* file_;
};

class MockWritableFile : public FSWritableFile {
 public:
  MockWritableFile(MemFile* file, const FileOptions& opts)
      : file_(file),
        use_direct_io_(opts.use_direct_writes),
        rate_limiter_(opts.rate_limiter) {
    file_->Ref();
  }

  ~MockWritableFile() override { file_->Unref(); }

  bool use_direct_io() const override { return false && use_direct_io_; }

  using FSWritableFile::Append;
  IOStatus Append(const Slice& data, const IOOptions& options,
                  IODebugContext* dbg) override {
    size_t bytes_written = 0;
    while (bytes_written < data.size()) {
      auto bytes = RequestToken(data.size() - bytes_written);
      IOStatus s = file_->Append(Slice(data.data() + bytes_written, bytes),
                                 options, dbg);
      if (!s.ok()) {
        return s;
      }
      bytes_written += bytes;
    }
    return IOStatus::OK();
  }

  using FSWritableFile::PositionedAppend;
  IOStatus PositionedAppend(const Slice& data, uint64_t /*offset*/,
                            const IOOptions& options,
                            IODebugContext* dbg) override {
    assert(use_direct_io_);
    return Append(data, options, dbg);
  }

  IOStatus Truncate(uint64_t size, const IOOptions& options,
                    IODebugContext* dbg) override {
    file_->Truncate(static_cast<size_t>(size), options, dbg);
    return IOStatus::OK();
  }
  IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
    return file_->Fsync(options, dbg);
  }

  IOStatus Flush(const IOOptions& /*options*/,
                 IODebugContext* /*dbg*/) override {
    return IOStatus::OK();
  }

  IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
    return file_->Fsync(options, dbg);
  }

  uint64_t GetFileSize(const IOOptions& /*options*/,
                       IODebugContext* /*dbg*/) override {
    return file_->Size();
  }

 private:
  inline size_t RequestToken(size_t bytes) {
    if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {
      bytes = std::min(
          bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
      rate_limiter_->Request(bytes, io_priority_);
    }
    return bytes;
  }

  MemFile* file_;
  bool use_direct_io_;
  RateLimiter* rate_limiter_;
};

class MockEnvDirectory : public FSDirectory {
 public:
  IOStatus Fsync(const IOOptions& /*options*/,
                 IODebugContext* /*dbg*/) override {
    return IOStatus::OK();
  }
};

class MockEnvFileLock : public FileLock {
 public:
  explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {}

  std::string FileName() const { return fname_; }

 private:
  const std::string fname_;
};

class TestMemLogger : public Logger {
 private:
  std::unique_ptr<FSWritableFile> file_;
  std::atomic_size_t log_size_;
  static const uint64_t flush_every_seconds_ = 5;
  std::atomic_uint_fast64_t last_flush_micros_;
  SystemClock* clock_;
  IOOptions options_;
  IODebugContext* dbg_;
  std::atomic<bool> flush_pending_;

 public:
  TestMemLogger(std::unique_ptr<FSWritableFile> f, SystemClock* clock,
                const IOOptions& options, IODebugContext* dbg,
                const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
      : Logger(log_level),
        file_(std::move(f)),
        log_size_(0),
        last_flush_micros_(0),
        clock_(clock),
        options_(options),
        dbg_(dbg),
        flush_pending_(false) {}
  ~TestMemLogger() override {}

  void Flush() override {
    if (flush_pending_) {
      flush_pending_ = false;
    }
    last_flush_micros_ = clock_->NowMicros();
  }

  using Logger::Logv;
  void Logv(const char* format, va_list ap) override {
    // We try twice: the first time with a fixed-size stack allocated buffer,
    // and the second time with a much larger dynamically allocated buffer.
    char buffer[500];
    for (int iter = 0; iter < 2; iter++) {
      char* base;
      int bufsize;
      if (iter == 0) {
        bufsize = sizeof(buffer);
        base = buffer;
      } else {
        bufsize = 30000;
        base = new char[bufsize];
      }
      char* p = base;
      char* limit = base + bufsize;

      struct timeval now_tv;
      gettimeofday(&now_tv, nullptr);
      const time_t seconds = now_tv.tv_sec;
      struct tm t;
      memset(&t, 0, sizeof(t));
      struct tm* ret __attribute__((__unused__));
      ret = localtime_r(&seconds, &t);
      assert(ret);
      p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ",
                    t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
                    t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec));

      // Print the message
      if (p < limit) {
        va_list backup_ap;
        va_copy(backup_ap, ap);
        p += vsnprintf(p, limit - p, format, backup_ap);
        va_end(backup_ap);
      }

      // Truncate to available space if necessary
      if (p >= limit) {
        if (iter == 0) {
          continue;  // Try again with larger buffer
        } else {
          p = limit - 1;
        }
      }

      // Add newline if necessary
      if (p == base || p[-1] != '\n') {
        *p++ = '\n';
      }

      assert(p <= limit);
      const size_t write_size = p - base;

      Status s = file_->Append(Slice(base, write_size), options_, dbg_);
      if (s.ok()) {
        flush_pending_ = true;
        log_size_ += write_size;
      }
      uint64_t now_micros =
          static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;
      if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
        flush_pending_ = false;
        last_flush_micros_ = now_micros;
      }
      if (base != buffer) {
        delete[] base;
      }
      break;
    }
  }
  size_t GetLogFileSize() const override { return log_size_; }
};

static std::unordered_map<std::string, OptionTypeInfo> mock_fs_type_info = {
#ifndef ROCKSDB_LITE
    {"supports_direct_io",
     {0, OptionType::kBoolean, OptionVerificationType::kNormal,
      OptionTypeFlags::kNone}},
#endif  // ROCKSDB_LITE
};
}  // namespace

MockFileSystem::MockFileSystem(const std::shared_ptr<SystemClock>& clock,
                               bool supports_direct_io)
    : system_clock_(clock), supports_direct_io_(supports_direct_io) {
  clock_ = system_clock_.get();
  RegisterOptions("", &supports_direct_io_, &mock_fs_type_info);
}

MockFileSystem::~MockFileSystem() {
  for (auto i = file_map_.begin(); i != file_map_.end(); ++i) {
    i->second->Unref();
  }
}

Status MockFileSystem::PrepareOptions(const ConfigOptions& options) {
  Status s = FileSystem::PrepareOptions(options);
  if (s.ok() && system_clock_ == SystemClock::Default()) {
    system_clock_ = options.env->GetSystemClock();
    clock_ = system_clock_.get();
  }
  return s;
}

IOStatus MockFileSystem::GetAbsolutePath(const std::string& db_path,
                                         const IOOptions& /*options*/,
                                         std::string* output_path,
                                         IODebugContext* /*dbg*/) {
  *output_path = NormalizeMockPath(db_path);
  if (output_path->at(0) != '/') {
    return IOStatus::NotSupported("GetAbsolutePath");
  } else {
    return IOStatus::OK();
  }
}

std::string MockFileSystem::NormalizeMockPath(const std::string& path) {
  std::string p = NormalizePath(path);
  if (p.back() == kFilePathSeparator && p.size() > 1) {
    p.pop_back();
  }
  return p;
}

// Partial implementation of the FileSystem interface.
IOStatus MockFileSystem::NewSequentialFile(
    const std::string& fname, const FileOptions& file_opts,
    std::unique_ptr<FSSequentialFile>* result, IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);

  MutexLock lock(&mutex_);
  if (file_map_.find(fn) == file_map_.end()) {
    *result = nullptr;
    return IOStatus::PathNotFound(fn);
  }
  auto* f = file_map_[fn];
  if (f->is_lock_file()) {
    return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
  } else if (file_opts.use_direct_reads && !supports_direct_io_) {
    return IOStatus::NotSupported("Direct I/O Not Supported");
  } else {
    result->reset(new MockSequentialFile(f, file_opts));
    return IOStatus::OK();
  }
}

IOStatus MockFileSystem::NewRandomAccessFile(
    const std::string& fname, const FileOptions& file_opts,
    std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  if (file_map_.find(fn) == file_map_.end()) {
    *result = nullptr;
    return IOStatus::PathNotFound(fn);
  }
  auto* f = file_map_[fn];
  if (f->is_lock_file()) {
    return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
  } else if (file_opts.use_direct_reads && !supports_direct_io_) {
    return IOStatus::NotSupported("Direct I/O Not Supported");
  } else {
    result->reset(new MockRandomAccessFile(f, file_opts));
    return IOStatus::OK();
  }
}

IOStatus MockFileSystem::NewRandomRWFile(
    const std::string& fname, const FileOptions& /*file_opts*/,
    std::unique_ptr<FSRandomRWFile>* result, IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  if (file_map_.find(fn) == file_map_.end()) {
    *result = nullptr;
    return IOStatus::PathNotFound(fn);
  }
  auto* f = file_map_[fn];
  if (f->is_lock_file()) {
    return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
  }
  result->reset(new MockRandomRWFile(f));
  return IOStatus::OK();
}

IOStatus MockFileSystem::ReuseWritableFile(
    const std::string& fname, const std::string& old_fname,
    const FileOptions& options, std::unique_ptr<FSWritableFile>* result,
    IODebugContext* dbg) {
  auto s = RenameFile(old_fname, fname, IOOptions(), dbg);
  if (!s.ok()) {
    return s;
  } else {
    result->reset();
    return NewWritableFile(fname, options, result, dbg);
  }
}

IOStatus MockFileSystem::NewWritableFile(
    const std::string& fname, const FileOptions& file_opts,
    std::unique_ptr<FSWritableFile>* result, IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  if (file_map_.find(fn) != file_map_.end()) {
    DeleteFileInternal(fn);
  }
  MemFile* file = new MemFile(clock_, fn, false);
  file->Ref();
  file_map_[fn] = file;
  if (file_opts.use_direct_writes && !supports_direct_io_) {
    return IOStatus::NotSupported("Direct I/O Not Supported");
  } else {
    result->reset(new MockWritableFile(file, file_opts));
    return IOStatus::OK();
  }
}

IOStatus MockFileSystem::ReopenWritableFile(
    const std::string& fname, const FileOptions& file_opts,
    std::unique_ptr<FSWritableFile>* result, IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  MemFile* file = nullptr;
  if (file_map_.find(fn) == file_map_.end()) {
    file = new MemFile(clock_, fn, false);
    // Only take a reference when we create the file objectt
    file->Ref();
    file_map_[fn] = file;
  } else {
    file = file_map_[fn];
  }
  if (file_opts.use_direct_writes && !supports_direct_io_) {
    return IOStatus::NotSupported("Direct I/O Not Supported");
  } else {
    result->reset(new MockWritableFile(file, file_opts));
    return IOStatus::OK();
  }
}

IOStatus MockFileSystem::NewDirectory(const std::string& /*name*/,
                                      const IOOptions& /*io_opts*/,
                                      std::unique_ptr<FSDirectory>* result,
                                      IODebugContext* /*dbg*/) {
  result->reset(new MockEnvDirectory());
  return IOStatus::OK();
}

IOStatus MockFileSystem::FileExists(const std::string& fname,
                                    const IOOptions& /*io_opts*/,
                                    IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  if (file_map_.find(fn) != file_map_.end()) {
    // File exists
    return IOStatus::OK();
  }
  // Now also check if fn exists as a dir
  for (const auto& iter : file_map_) {
    const std::string& filename = iter.first;
    if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' &&
        Slice(filename).starts_with(Slice(fn))) {
      return IOStatus::OK();
    }
  }
  return IOStatus::NotFound();
}

bool MockFileSystem::GetChildrenInternal(const std::string& dir,
                                         std::vector<std::string>* result) {
  auto d = NormalizeMockPath(dir);
  bool found_dir = false;
  result->clear();
  for (const auto& iter : file_map_) {
    const std::string& filename = iter.first;

    if (filename == d) {
      found_dir = true;
    } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&
               Slice(filename).starts_with(Slice(d))) {
      found_dir = true;
      size_t next_slash = filename.find('/', d.size() + 1);
      if (next_slash != std::string::npos) {
        result->push_back(
            filename.substr(d.size() + 1, next_slash - d.size() - 1));
      } else {
        result->push_back(filename.substr(d.size() + 1));
      }
    }
  }
  result->erase(std::unique(result->begin(), result->end()), result->end());
  return found_dir;
}

IOStatus MockFileSystem::GetChildren(const std::string& dir,
                                     const IOOptions& /*options*/,
                                     std::vector<std::string>* result,
                                     IODebugContext* /*dbg*/) {
  MutexLock lock(&mutex_);
  bool found_dir = GetChildrenInternal(dir, result);
#ifndef __clang_analyzer__
  return found_dir ? IOStatus::OK() : IOStatus::NotFound(dir);
#else
  return found_dir ? IOStatus::OK() : IOStatus::NotFound();
#endif
}

void MockFileSystem::DeleteFileInternal(const std::string& fname) {
  assert(fname == NormalizeMockPath(fname));
  const auto& pair = file_map_.find(fname);
  if (pair != file_map_.end()) {
    pair->second->Unref();
    file_map_.erase(fname);
  }
}

IOStatus MockFileSystem::DeleteFile(const std::string& fname,
                                    const IOOptions& /*options*/,
                                    IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  if (file_map_.find(fn) == file_map_.end()) {
    return IOStatus::PathNotFound(fn);
  }

  DeleteFileInternal(fn);
  return IOStatus::OK();
}

IOStatus MockFileSystem::Truncate(const std::string& fname, size_t size,
                                  const IOOptions& options,
                                  IODebugContext* dbg) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  auto iter = file_map_.find(fn);
  if (iter == file_map_.end()) {
    return IOStatus::PathNotFound(fn);
  }
  iter->second->Truncate(size, options, dbg);
  return IOStatus::OK();
}

IOStatus MockFileSystem::CreateDir(const std::string& dirname,
                                   const IOOptions& /*options*/,
                                   IODebugContext* /*dbg*/) {
  auto dn = NormalizeMockPath(dirname);
  MutexLock lock(&mutex_);
  if (file_map_.find(dn) == file_map_.end()) {
    MemFile* file = new MemFile(clock_, dn, false);
    file->Ref();
    file_map_[dn] = file;
  } else {
    return IOStatus::IOError();
  }
  return IOStatus::OK();
}

IOStatus MockFileSystem::CreateDirIfMissing(const std::string& dirname,
                                            const IOOptions& options,
                                            IODebugContext* dbg) {
  CreateDir(dirname, options, dbg).PermitUncheckedError();
  return IOStatus::OK();
}

IOStatus MockFileSystem::DeleteDir(const std::string& dirname,
                                   const IOOptions& /*options*/,
                                   IODebugContext* /*dbg*/) {
  auto dir = NormalizeMockPath(dirname);
  MutexLock lock(&mutex_);
  if (file_map_.find(dir) == file_map_.end()) {
    return IOStatus::PathNotFound(dir);
  } else {
    std::vector<std::string> children;
    if (GetChildrenInternal(dir, &children)) {
      for (const auto& child : children) {
        DeleteFileInternal(child);
      }
    }
    DeleteFileInternal(dir);
    return IOStatus::OK();
  }
}

IOStatus MockFileSystem::GetFileSize(const std::string& fname,
                                     const IOOptions& /*options*/,
                                     uint64_t* file_size,
                                     IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  auto iter = file_map_.find(fn);
  if (iter == file_map_.end()) {
    return IOStatus::PathNotFound(fn);
  }

  *file_size = iter->second->Size();
  return IOStatus::OK();
}

IOStatus MockFileSystem::GetFileModificationTime(const std::string& fname,
                                                 const IOOptions& /*options*/,
                                                 uint64_t* time,
                                                 IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  auto iter = file_map_.find(fn);
  if (iter == file_map_.end()) {
    return IOStatus::PathNotFound(fn);
  }
  *time = iter->second->ModifiedTime();
  return IOStatus::OK();
}

bool MockFileSystem::RenameFileInternal(const std::string& src,
                                        const std::string& dest) {
  if (file_map_.find(src) == file_map_.end()) {
    return false;
  } else {
    std::vector<std::string> children;
    if (GetChildrenInternal(src, &children)) {
      for (const auto& child : children) {
        RenameFileInternal(src + "/" + child, dest + "/" + child);
      }
    }
    DeleteFileInternal(dest);
    file_map_[dest] = file_map_[src];
    file_map_.erase(src);
    return true;
  }
}

IOStatus MockFileSystem::RenameFile(const std::string& src,
                                    const std::string& dest,
                                    const IOOptions& /*options*/,
                                    IODebugContext* /*dbg*/) {
  auto s = NormalizeMockPath(src);
  auto t = NormalizeMockPath(dest);
  MutexLock lock(&mutex_);
  bool found = RenameFileInternal(s, t);
  if (!found) {
    return IOStatus::PathNotFound(s);
  } else {
    return IOStatus::OK();
  }
}

IOStatus MockFileSystem::LinkFile(const std::string& src,
                                  const std::string& dest,
                                  const IOOptions& /*options*/,
                                  IODebugContext* /*dbg*/) {
  auto s = NormalizeMockPath(src);
  auto t = NormalizeMockPath(dest);
  MutexLock lock(&mutex_);
  if (file_map_.find(s) == file_map_.end()) {
    return IOStatus::PathNotFound(s);
  }

  DeleteFileInternal(t);
  file_map_[t] = file_map_[s];
  file_map_[t]->Ref();  // Otherwise it might get deleted when noone uses s
  return IOStatus::OK();
}

IOStatus MockFileSystem::NewLogger(const std::string& fname,
                                   const IOOptions& io_opts,
                                   std::shared_ptr<Logger>* result,
                                   IODebugContext* dbg) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  auto iter = file_map_.find(fn);
  MemFile* file = nullptr;
  if (iter == file_map_.end()) {
    file = new MemFile(clock_, fn, false);
    file->Ref();
    file_map_[fn] = file;
  } else {
    file = iter->second;
  }
  std::unique_ptr<FSWritableFile> f(new MockWritableFile(file, FileOptions()));
  result->reset(new TestMemLogger(std::move(f), clock_, io_opts, dbg));
  return IOStatus::OK();
}

IOStatus MockFileSystem::LockFile(const std::string& fname,
                                  const IOOptions& /*options*/,
                                  FileLock** flock, IODebugContext* /*dbg*/) {
  auto fn = NormalizeMockPath(fname);
  {
    MutexLock lock(&mutex_);
    if (file_map_.find(fn) != file_map_.end()) {
      if (!file_map_[fn]->is_lock_file()) {
        return IOStatus::InvalidArgument(fname, "Not a lock file.");
      }
      if (!file_map_[fn]->Lock()) {
        return IOStatus::IOError(fn, "lock is already held.");
      }
    } else {
      auto* file = new MemFile(clock_, fn, true);
      file->Ref();
      file->Lock();
      file_map_[fn] = file;
    }
  }
  *flock = new MockEnvFileLock(fn);
  return IOStatus::OK();
}

IOStatus MockFileSystem::UnlockFile(FileLock* flock,
                                    const IOOptions& /*options*/,
                                    IODebugContext* /*dbg*/) {
  std::string fn = static_cast_with_check<MockEnvFileLock>(flock)->FileName();
  {
    MutexLock lock(&mutex_);
    if (file_map_.find(fn) != file_map_.end()) {
      if (!file_map_[fn]->is_lock_file()) {
        return IOStatus::InvalidArgument(fn, "Not a lock file.");
      }
      file_map_[fn]->Unlock();
    }
  }
  delete flock;
  return IOStatus::OK();
}

IOStatus MockFileSystem::GetTestDirectory(const IOOptions& /*options*/,
                                          std::string* path,
                                          IODebugContext* /*dbg*/) {
  *path = "/test";
  return IOStatus::OK();
}

Status MockFileSystem::CorruptBuffer(const std::string& fname) {
  auto fn = NormalizeMockPath(fname);
  MutexLock lock(&mutex_);
  auto iter = file_map_.find(fn);
  if (iter == file_map_.end()) {
    return Status::IOError(fn, "File not found");
  }
  iter->second->CorruptBuffer();
  return Status::OK();
}

MockEnv::MockEnv(Env* env, const std::shared_ptr<FileSystem>& fs,
                 const std::shared_ptr<SystemClock>& clock)
    : CompositeEnvWrapper(env, fs, clock) {}

MockEnv* MockEnv::Create(Env* env) {
  auto clock =
      std::make_shared<EmulatedSystemClock>(env->GetSystemClock(), true);
  return MockEnv::Create(env, clock);
}

MockEnv* MockEnv::Create(Env* env, const std::shared_ptr<SystemClock>& clock) {
  auto fs = std::make_shared<MockFileSystem>(clock);
  return new MockEnv(env, fs, clock);
}

Status MockEnv::CorruptBuffer(const std::string& fname) {
  auto mock = static_cast_with_check<MockFileSystem>(GetFileSystem().get());
  return mock->CorruptBuffer(fname);
}

#ifndef ROCKSDB_LITE
// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnv
Env* NewMemEnv(Env* base_env) { return MockEnv::Create(base_env); }

#else  // ROCKSDB_LITE

Env* NewMemEnv(Env* /*base_env*/) { return nullptr; }

#endif  // !ROCKSDB_LITE

}  // namespace ROCKSDB_NAMESPACE
back to top