https://github.com/facebook/rocksdb
Raw File
Tip revision: c2452696d9fed7f43f6b2db89d5bfeace5b9d23e authored by Levi Tamasi on 03 November 2022, 20:51:23 UTC
Bump up version to 7.5.4
Tip revision: c245269
composite_env.cc
// Copyright (c) 2019-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
#include "env/composite_env_wrapper.h"
#include "rocksdb/utilities/options_type.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
namespace {
// The CompositeEnvWrapper class provides an interface that is compatible
// with the old monolithic Env API, and an implementation that wraps around
// the new Env that provides threading and other OS related functionality, and
// the new FileSystem API that provides storage functionality. By
// providing the old Env interface, it allows the rest of RocksDB code to
// be agnostic of whether the underlying Env implementation is a monolithic
// Env or an Env + FileSystem. In the former case, the user will specify
// Options::env only, whereas in the latter case, the user will specify
// Options::env and Options::file_system.

class CompositeSequentialFileWrapper : public SequentialFile {
 public:
  explicit CompositeSequentialFileWrapper(
      std::unique_ptr<FSSequentialFile>& target)
      : target_(std::move(target)) {}

  Status Read(size_t n, Slice* result, char* scratch) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Read(n, io_opts, result, scratch, &dbg);
  }
  Status Skip(uint64_t n) override { return target_->Skip(n); }
  bool use_direct_io() const override { return target_->use_direct_io(); }
  size_t GetRequiredBufferAlignment() const override {
    return target_->GetRequiredBufferAlignment();
  }
  Status InvalidateCache(size_t offset, size_t length) override {
    return target_->InvalidateCache(offset, length);
  }
  Status PositionedRead(uint64_t offset, size_t n, Slice* result,
                        char* scratch) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
  }

 private:
  std::unique_ptr<FSSequentialFile> target_;
};

class CompositeRandomAccessFileWrapper : public RandomAccessFile {
 public:
  explicit CompositeRandomAccessFileWrapper(
      std::unique_ptr<FSRandomAccessFile>& target)
      : target_(std::move(target)) {}

  Status Read(uint64_t offset, size_t n, Slice* result,
              char* scratch) const override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Read(offset, n, io_opts, result, scratch, &dbg);
  }
  Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
    IOOptions io_opts;
    IODebugContext dbg;
    std::vector<FSReadRequest> fs_reqs;
    Status status;

    fs_reqs.resize(num_reqs);
    for (size_t i = 0; i < num_reqs; ++i) {
      fs_reqs[i].offset = reqs[i].offset;
      fs_reqs[i].len = reqs[i].len;
      fs_reqs[i].scratch = reqs[i].scratch;
      fs_reqs[i].status = IOStatus::OK();
    }
    status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
    for (size_t i = 0; i < num_reqs; ++i) {
      reqs[i].result = fs_reqs[i].result;
      reqs[i].status = fs_reqs[i].status;
    }
    return status;
  }
  Status Prefetch(uint64_t offset, size_t n) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Prefetch(offset, n, io_opts, &dbg);
  }
  size_t GetUniqueId(char* id, size_t max_size) const override {
    return target_->GetUniqueId(id, max_size);
  }
  void Hint(AccessPattern pattern) override {
    target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
  }
  bool use_direct_io() const override { return target_->use_direct_io(); }
  size_t GetRequiredBufferAlignment() const override {
    return target_->GetRequiredBufferAlignment();
  }
  Status InvalidateCache(size_t offset, size_t length) override {
    return target_->InvalidateCache(offset, length);
  }

 private:
  std::unique_ptr<FSRandomAccessFile> target_;
};

class CompositeWritableFileWrapper : public WritableFile {
 public:
  explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
      : target_(std::move(t)) {}

  Status Append(const Slice& data) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Append(data, io_opts, &dbg);
  }
  Status Append(const Slice& data,
                const DataVerificationInfo& verification_info) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Append(data, io_opts, verification_info, &dbg);
  }
  Status PositionedAppend(const Slice& data, uint64_t offset) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->PositionedAppend(data, offset, io_opts, &dbg);
  }
  Status PositionedAppend(
      const Slice& data, uint64_t offset,
      const DataVerificationInfo& verification_info) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->PositionedAppend(data, offset, io_opts, verification_info,
                                     &dbg);
  }
  Status Truncate(uint64_t size) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Truncate(size, io_opts, &dbg);
  }
  Status Close() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Close(io_opts, &dbg);
  }
  Status Flush() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Flush(io_opts, &dbg);
  }
  Status Sync() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Sync(io_opts, &dbg);
  }
  Status Fsync() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Fsync(io_opts, &dbg);
  }
  bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }

  bool use_direct_io() const override { return target_->use_direct_io(); }

  size_t GetRequiredBufferAlignment() const override {
    return target_->GetRequiredBufferAlignment();
  }

  void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
    target_->SetWriteLifeTimeHint(hint);
  }

  Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
    return target_->GetWriteLifeTimeHint();
  }

  uint64_t GetFileSize() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->GetFileSize(io_opts, &dbg);
  }

  void SetPreallocationBlockSize(size_t size) override {
    target_->SetPreallocationBlockSize(size);
  }

  void GetPreallocationStatus(size_t* block_size,
                              size_t* last_allocated_block) override {
    target_->GetPreallocationStatus(block_size, last_allocated_block);
  }

  size_t GetUniqueId(char* id, size_t max_size) const override {
    return target_->GetUniqueId(id, max_size);
  }

  Status InvalidateCache(size_t offset, size_t length) override {
    return target_->InvalidateCache(offset, length);
  }

  Status RangeSync(uint64_t offset, uint64_t nbytes) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->RangeSync(offset, nbytes, io_opts, &dbg);
  }

  void PrepareWrite(size_t offset, size_t len) override {
    IOOptions io_opts;
    IODebugContext dbg;
    target_->PrepareWrite(offset, len, io_opts, &dbg);
  }

  Status Allocate(uint64_t offset, uint64_t len) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Allocate(offset, len, io_opts, &dbg);
  }

  std::unique_ptr<FSWritableFile>* target() { return &target_; }

 private:
  std::unique_ptr<FSWritableFile> target_;
};

class CompositeRandomRWFileWrapper : public RandomRWFile {
 public:
  explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
      : target_(std::move(target)) {}

  bool use_direct_io() const override { return target_->use_direct_io(); }
  size_t GetRequiredBufferAlignment() const override {
    return target_->GetRequiredBufferAlignment();
  }
  Status Write(uint64_t offset, const Slice& data) override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Write(offset, data, io_opts, &dbg);
  }
  Status Read(uint64_t offset, size_t n, Slice* result,
              char* scratch) const override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Read(offset, n, io_opts, result, scratch, &dbg);
  }
  Status Flush() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Flush(io_opts, &dbg);
  }
  Status Sync() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Sync(io_opts, &dbg);
  }
  Status Fsync() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Fsync(io_opts, &dbg);
  }
  Status Close() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Close(io_opts, &dbg);
  }

 private:
  std::unique_ptr<FSRandomRWFile> target_;
};

class CompositeDirectoryWrapper : public Directory {
 public:
  explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
      : target_(std::move(target)) {}

  Status Fsync() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
  }

  Status Close() override {
    IOOptions io_opts;
    IODebugContext dbg;
    return target_->Close(io_opts, &dbg);
  }

  size_t GetUniqueId(char* id, size_t max_size) const override {
    return target_->GetUniqueId(id, max_size);
  }

 private:
  std::unique_ptr<FSDirectory> target_;
};
}  // namespace

Status CompositeEnv::NewSequentialFile(const std::string& f,
                                       std::unique_ptr<SequentialFile>* r,
                                       const EnvOptions& options) {
  IODebugContext dbg;
  std::unique_ptr<FSSequentialFile> file;
  Status status;
  status =
      file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
  if (status.ok()) {
    r->reset(new CompositeSequentialFileWrapper(file));
  }
  return status;
}

Status CompositeEnv::NewRandomAccessFile(const std::string& f,
                                         std::unique_ptr<RandomAccessFile>* r,
                                         const EnvOptions& options) {
  IODebugContext dbg;
  std::unique_ptr<FSRandomAccessFile> file;
  Status status;
  status =
      file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
  if (status.ok()) {
    r->reset(new CompositeRandomAccessFileWrapper(file));
  }
  return status;
}

Status CompositeEnv::NewWritableFile(const std::string& f,
                                     std::unique_ptr<WritableFile>* r,
                                     const EnvOptions& options) {
  IODebugContext dbg;
  std::unique_ptr<FSWritableFile> file;
  Status status;
  status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
  if (status.ok()) {
    r->reset(new CompositeWritableFileWrapper(file));
  }
  return status;
}

Status CompositeEnv::ReopenWritableFile(const std::string& fname,
                                        std::unique_ptr<WritableFile>* result,
                                        const EnvOptions& options) {
  IODebugContext dbg;
  Status status;
  std::unique_ptr<FSWritableFile> file;
  status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
                                            &dbg);
  if (status.ok()) {
    result->reset(new CompositeWritableFileWrapper(file));
  }
  return status;
}

Status CompositeEnv::ReuseWritableFile(const std::string& fname,
                                       const std::string& old_fname,
                                       std::unique_ptr<WritableFile>* r,
                                       const EnvOptions& options) {
  IODebugContext dbg;
  Status status;
  std::unique_ptr<FSWritableFile> file;
  status = file_system_->ReuseWritableFile(fname, old_fname,
                                           FileOptions(options), &file, &dbg);
  if (status.ok()) {
    r->reset(new CompositeWritableFileWrapper(file));
  }
  return status;
}

Status CompositeEnv::NewRandomRWFile(const std::string& fname,
                                     std::unique_ptr<RandomRWFile>* result,
                                     const EnvOptions& options) {
  IODebugContext dbg;
  std::unique_ptr<FSRandomRWFile> file;
  Status status;
  status =
      file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
  if (status.ok()) {
    result->reset(new CompositeRandomRWFileWrapper(file));
  }
  return status;
}

Status CompositeEnv::NewDirectory(const std::string& name,
                                  std::unique_ptr<Directory>* result) {
  IOOptions io_opts;
  IODebugContext dbg;
  std::unique_ptr<FSDirectory> dir;
  Status status;
  status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
  if (status.ok()) {
    result->reset(new CompositeDirectoryWrapper(dir));
  }
  return status;
}

namespace {
static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = {
#ifndef ROCKSDB_LITE
    {"target",
     OptionTypeInfo(0, OptionType::kUnknown, OptionVerificationType::kByName,
                    OptionTypeFlags::kDontSerialize)
         .SetParseFunc([](const ConfigOptions& opts,
                          const std::string& /*name*/, const std::string& value,
                          void* addr) {
           auto target = static_cast<EnvWrapper::Target*>(addr);
           return Env::CreateFromString(opts, value, &(target->env),
                                        &(target->guard));
         })
         .SetEqualsFunc([](const ConfigOptions& opts,
                           const std::string& /*name*/, const void* addr1,
                           const void* addr2, std::string* mismatch) {
           const auto target1 = static_cast<const EnvWrapper::Target*>(addr1);
           const auto target2 = static_cast<const EnvWrapper::Target*>(addr2);
           if (target1->env != nullptr) {
             return target1->env->AreEquivalent(opts, target2->env, mismatch);
           } else {
             return (target2->env == nullptr);
           }
         })
         .SetPrepareFunc([](const ConfigOptions& opts,
                            const std::string& /*name*/, void* addr) {
           auto target = static_cast<EnvWrapper::Target*>(addr);
           if (target->guard.get() != nullptr) {
             target->env = target->guard.get();
           } else if (target->env == nullptr) {
             target->env = Env::Default();
           }
           return target->env->PrepareOptions(opts);
         })
         .SetValidateFunc([](const DBOptions& db_opts,
                             const ColumnFamilyOptions& cf_opts,
                             const std::string& /*name*/, const void* addr) {
           const auto target = static_cast<const EnvWrapper::Target*>(addr);
           if (target->env == nullptr) {
             return Status::InvalidArgument("Target Env not specified");
           } else {
             return target->env->ValidateOptions(db_opts, cf_opts);
           }
         })},
#endif  // ROCKSDB_LITE
};
static std::unordered_map<std::string, OptionTypeInfo>
    composite_fs_wrapper_type_info = {
#ifndef ROCKSDB_LITE
        {"file_system",
         OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
             0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif  // ROCKSDB_LITE
};

static std::unordered_map<std::string, OptionTypeInfo>
    composite_clock_wrapper_type_info = {
#ifndef ROCKSDB_LITE
        {"clock",
         OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
             0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif  // ROCKSDB_LITE
};

}  // namespace

std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
  return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}

CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
                                         const std::shared_ptr<FileSystem>& fs,
                                         const std::shared_ptr<SystemClock>& sc)
    : CompositeEnv(fs, sc), target_(env) {
  RegisterOptions("", &target_, &env_wrapper_type_info);
  RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
  RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
}

CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
                                         const std::shared_ptr<FileSystem>& fs,
                                         const std::shared_ptr<SystemClock>& sc)
    : CompositeEnv(fs, sc), target_(env) {
  RegisterOptions("", &target_, &env_wrapper_type_info);
  RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
  RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
}

Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
  target_.Prepare();
  if (file_system_ == nullptr) {
    file_system_ = target_.env->GetFileSystem();
  }
  if (system_clock_ == nullptr) {
    system_clock_ = target_.env->GetSystemClock();
  }
  return Env::PrepareOptions(options);
}

#ifndef ROCKSDB_LITE
std::string CompositeEnvWrapper::SerializeOptions(
    const ConfigOptions& config_options, const std::string& header) const {
  auto options = CompositeEnv::SerializeOptions(config_options, header);
  if (target_.env != nullptr && target_.env != Env::Default()) {
    options.append("target=");
    options.append(target_.env->ToString(config_options));
  }
  return options;
}
#endif  // ROCKSDB_LITE

EnvWrapper::EnvWrapper(Env* t) : target_(t) {
  RegisterOptions("", &target_, &env_wrapper_type_info);
}

EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) {
  RegisterOptions("", &target_, &env_wrapper_type_info);
}

EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) {
  RegisterOptions("", &target_, &env_wrapper_type_info);
}

EnvWrapper::~EnvWrapper() {}

Status EnvWrapper::PrepareOptions(const ConfigOptions& options) {
  target_.Prepare();
  return Env::PrepareOptions(options);
}

#ifndef ROCKSDB_LITE
std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options,
                                         const std::string& header) const {
  auto parent = Env::SerializeOptions(config_options, "");
  if (config_options.IsShallow() || target_.env == nullptr ||
      target_.env == Env::Default()) {
    return parent;
  } else {
    std::string result = header;
    if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
      result.append(OptionTypeInfo::kIdPropName()).append("=");
    }
    result.append(parent);
    if (!EndsWith(result, config_options.delimiter)) {
      result.append(config_options.delimiter);
    }
    result.append("target=").append(target_.env->ToString(config_options));
    return result;
  }
}
#endif  // ROCKSDB_LITE

}  // namespace ROCKSDB_NAMESPACE
back to top