https://github.com/facebook/rocksdb
Raw File
env_librados.cc
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
#include "rocksdb/utilities/env_librados.h"
#include "util/random.h"
#include <mutex>
#include <cstdlib>

namespace ROCKSDB_NAMESPACE {
/* GLOBAL DIFINE */
// #define DEBUG
#ifdef DEBUG
#include <cstdio>
#include <sys/syscall.h>
#include <unistd.h>
#define LOG_DEBUG(...)  do{\
    printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
    printf(__VA_ARGS__);\
  }while(0)
#else
#define LOG_DEBUG(...)
#endif

/* GLOBAL CONSTANT */
const char *default_db_name     = "default_envlibrados_db";
const char *default_pool_name   = "default_envlibrados_pool";
const char *default_config_path = "CEPH_CONFIG_PATH";           // the env variable name of ceph configure file
// maximum dir/file that can store in the fs
const int MAX_ITEMS_IN_FS = 1 << 30;
// root dir tag
const std::string ROOT_DIR_KEY = "/";
const std::string DIR_ID_VALUE = "<DIR>";

/**
 * @brief convert error code to status
 * @details Convert internal linux error code to Status
 *
 * @param r [description]
 * @return [description]
 */
Status err_to_status(int r)
{
  switch (r) {
  case 0:
    return Status::OK();
  case -ENOENT:
    return Status::IOError();
  case -ENODATA:
  case -ENOTDIR:
    return Status::NotFound(Status::kNone);
  case -EINVAL:
    return Status::InvalidArgument(Status::kNone);
  case -EIO:
    return Status::IOError(Status::kNone);
  default:
    // FIXME :(
    assert(0 == "unrecognized error code");
    return Status::NotSupported(Status::kNone);
  }
}

/**
 * @brief split file path into dir path and file name
 * @details
 * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
 *  For example:
 *    b/c => dir '/b', file 'c'
 *    /a/b/c => dir '/b', file 'c'
 *
 * @param fn [description]
 * @param dir [description]
 * @param file [description]
 */
void split(const std::string &fn, std::string *dir, std::string *file) {
  LOG_DEBUG("[IN]%s\n", fn.c_str());
  int pos = fn.size() - 1;
  while ('/' == fn[pos]) --pos;
  size_t fstart = fn.rfind('/', pos);
  *file = fn.substr(fstart + 1, pos - fstart);

  pos = fstart;
  while (pos >= 0 && '/' == fn[pos]) --pos;

  if (pos < 0) {
    *dir = "/";
  } else {
    size_t dstart = fn.rfind('/', pos);
    *dir = fn.substr(dstart + 1, pos - dstart);
    *dir = std::string("/") + *dir;
  }

  LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str());
}

// A file abstraction for reading sequentially through a file
class LibradosSequentialFile : public SequentialFile {
  librados::IoCtx * _io_ctx;
  std::string _fid;
  std::string _hint;
  int _offset;
public:
  LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
    _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {}

  ~LibradosSequentialFile() {}

  /**
   * @brief read file
   * @details
   *  Read up to "n" bytes from the file.  "scratch[0..n-1]" may be
   *  written by this routine.  Sets "*result" to the data that was
   *  read (including if fewer than "n" bytes were successfully read).
   *  May set "*result" to point at data in "scratch[0..n-1]", so
   *  "scratch[0..n-1]" must be live when "*result" is used.
   *  If an error was encountered, returns a non-OK status.
   *
   *  REQUIRES: External synchronization
   *
   * @param n [description]
   * @param result [description]
   * @param scratch [description]
   * @return [description]
   */
  Status Read(size_t n, Slice* result, char* scratch) {
    LOG_DEBUG("[IN]%i\n", (int)n);
    librados::bufferlist buffer;
    Status s;
    int r = _io_ctx->read(_fid, buffer, n, _offset);
    if (r >= 0) {
      buffer.begin().copy(r, scratch);
      *result = Slice(scratch, r);
      _offset += r;
      s = Status::OK();
    } else {
      s = err_to_status(r);
      if (s == Status::IOError()) {
        *result = Slice();
        s = Status::OK();
      }
    }
    LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
    return s;
  }

  /**
   * @brief skip "n" bytes from the file
   * @details
   *  Skip "n" bytes from the file. This is guaranteed to be no
   *  slower that reading the same data, but may be faster.
   *
   *  If end of file is reached, skipping will stop at the end of the
   *  file, and Skip will return OK.
   *
   *  REQUIRES: External synchronization
   *
   * @param n [description]
   * @return [description]
   */
  Status Skip(uint64_t n) {
    _offset += n;
    return Status::OK();
  }

  /**
   * @brief noop
   * @details
   *  rocksdb has it's own caching capabilities that we should be able to use,
   *  without relying on a cache here. This can safely be a no-op.
   *
   * @param offset [description]
   * @param length [description]
   *
   * @return [description]
   */
  Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
    return Status::OK();
  }
};

// A file abstraction for randomly reading the contents of a file.
class LibradosRandomAccessFile : public RandomAccessFile {
  librados::IoCtx * _io_ctx;
  std::string _fid;
  std::string _hint;
public:
  LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
    _io_ctx(io_ctx), _fid(fid), _hint(hint) {}

  ~LibradosRandomAccessFile() {}

  /**
   * @brief read file
   * @details similar to LibradosSequentialFile::Read
   *
   * @param offset [description]
   * @param n [description]
   * @param result [description]
   * @param scratch [description]
   * @return [description]
   */
  Status Read(uint64_t offset, size_t n, Slice* result,
              char* scratch) const {
    LOG_DEBUG("[IN]%i\n", (int)n);
    librados::bufferlist buffer;
    Status s;
    int r = _io_ctx->read(_fid, buffer, n, offset);
    if (r >= 0) {
      buffer.begin().copy(r, scratch);
      *result = Slice(scratch, r);
      s = Status::OK();
    } else {
      s = err_to_status(r);
      if (s == Status::IOError()) {
        *result = Slice();
        s = Status::OK();
      }
    }
    LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
    return s;
  }

  /**
   * @brief [brief description]
   * @details Get unique id for each file and guarantee this id is different for each file
   *
   * @param id [description]
   * @param max_size max size of id, it shoud be larger than 16
   *
   * @return [description]
   */
  size_t GetUniqueId(char* id, size_t max_size) const {
    // All fid has the same db_id prefix, so we need to ignore db_id prefix
    size_t s = std::min(max_size, _fid.size());
    strncpy(id, _fid.c_str() + (_fid.size() - s), s);
    id[s - 1] = '\0';
    return s;
  };

  //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
  void Hint(AccessPattern /*pattern*/) { /* Do nothing */
  }

  /**
   * @brief noop
   * @details [long description]
   *
   * @param offset [description]
   * @param length [description]
   *
   * @return [description]
   */
  Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
    return Status::OK();
  }
};


// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class LibradosWritableFile : public WritableFile {
  librados::IoCtx * _io_ctx;
  std::string _fid;
  std::string _hint;
  const EnvLibrados * const _env;

  std::mutex _mutex;                 // used to protect modification of all following variables
  librados::bufferlist _buffer;      // write buffer
  uint64_t _buffer_size;             // write buffer size
  uint64_t _file_size;               // this file size doesn't include buffer size

  /**
   * @brief assuming caller holds lock
   * @details [long description]
   * @return [description]
   */
  int _SyncLocked() {
    // 1. sync append data to RADOS
    int r = _io_ctx->append(_fid, _buffer, _buffer_size);
    assert(r >= 0);

    // 2. update local variables
    if (0 == r) {
      _buffer.clear();
      _file_size += _buffer_size;
      _buffer_size = 0;
    }

    return r;
  }

 public:
  LibradosWritableFile(librados::IoCtx* io_ctx, std::string fid,
                       std::string hint, const EnvLibrados* const env,
                       const EnvOptions& options)
      : WritableFile(options),
        _io_ctx(io_ctx),
        _fid(fid),
        _hint(hint),
        _env(env),
        _buffer(),
        _buffer_size(0),
        _file_size(0) {
    int ret = _io_ctx->stat(_fid, &_file_size, nullptr);

    // if file not exist
    if (ret < 0) {
      _file_size = 0;
    }
  }

  ~LibradosWritableFile() {
    // sync before closeing writable file
    Sync();
  }

  using WritableFile::Append;
  /**
   * @brief append data to file
   * @details
   *  Append will save all written data in buffer util buffer size
   *  reaches buffer max size. Then, it will write buffer into rados
   *
   * @param data [description]
   * @return [description]
   */
  Status Append(const Slice& data) override {
    // append buffer
    LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data());
    int r = 0;

    std::lock_guard<std::mutex> lock(_mutex);
    _buffer.append(data.data(), data.size());
    _buffer_size += data.size();

    if (_buffer_size > _env->_write_buffer_size) {
      r = _SyncLocked();
    }

    LOG_DEBUG("[OUT] %i\n", r);
    return err_to_status(r);
  }

  using WritableFile::PositionedAppend;
  /**
   * @brief not supported
   * @details [long description]
   * @return [description]
   */
  Status PositionedAppend(const Slice& /* data */,
                          uint64_t /* offset */) override {
    return Status::NotSupported();
  }

  /**
   * @brief truncate file to assigned size
   * @details [long description]
   *
   * @param size [description]
   * @return [description]
   */
  Status Truncate(uint64_t size) override {
    LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size);
    int r = 0;

    std::lock_guard<std::mutex> lock(_mutex);
    if (_file_size > size) {
      r = _io_ctx->trunc(_fid, size);

      if (r == 0) {
        _buffer.clear();
        _buffer_size = 0;
        _file_size = size;
      }
    } else if (_file_size == size) {
      _buffer.clear();
      _buffer_size = 0;
    } else {
      librados::bufferlist tmp;
      tmp.claim(_buffer);
      _buffer.substr_of(tmp, 0, size - _file_size);
      _buffer_size = size - _file_size;
    }

    LOG_DEBUG("[OUT] %i\n", r);
    return err_to_status(r);
  }

  /**
   * @brief close file
   * @details [long description]
   * @return [description]
   */
  Status Close() override {
    LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size);
    return Sync();
  }

  /**
   * @brief flush file,
   * @details initiate an aio write and not wait
   *
   * @return [description]
   */
  Status Flush() override {
    librados::AioCompletion *write_completion = librados::Rados::aio_create_completion();
    int r = 0;

    std::lock_guard<std::mutex> lock(_mutex);
    r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size);

    if (0 == r) {
      _file_size += _buffer_size;
      _buffer.clear();
      _buffer_size = 0;
    }

    write_completion->release();

    return err_to_status(r);
  }

  /**
   * @brief write buffer data to rados
   * @details initiate an aio write and wait for result
   * @return [description]
   */
  Status Sync() override {  // sync data
    int r = 0;

    std::lock_guard<std::mutex> lock(_mutex);
    if (_buffer_size > 0) {
      r = _SyncLocked();
    }

    return err_to_status(r);
  }

  /**
   * @brief [brief description]
   * @details [long description]
   * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
   */
  bool IsSyncThreadSafe() const override { return true; }

  /**
   * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
   * @details [long description]
   * @return [description]
   */
  bool use_direct_io() const override { return false; }

  /**
   * @brief Get file size
   * @details
   *  This API will use cached file_size.
   * @return [description]
   */
  uint64_t GetFileSize() override {
    LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size);

    std::lock_guard<std::mutex> lock(_mutex);
    int file_size = _file_size + _buffer_size;

    return file_size;
  }

  /**
   * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
   * @details [long description]
   *
   * @param id [description]
   * @param max_size [description]
   *
   * @return [description]
   */
  size_t GetUniqueId(char* id, size_t max_size) const override {
    // All fid has the same db_id prefix, so we need to ignore db_id prefix
    size_t s = std::min(max_size, _fid.size());
    strncpy(id, _fid.c_str() + (_fid.size() - s), s);
    id[s - 1] = '\0';
    return s;
  }

  /**
   * @brief noop
   * @details [long description]
   *
   * @param offset [description]
   * @param length [description]
   *
   * @return [description]
   */
  Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
    return Status::OK();
  }

  /**
   * @brief No RangeSync support, just call Sync()
   * @details [long description]
   *
   * @param offset [description]
   * @param nbytes [description]
   *
   * @return [description]
   */
  Status RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/) override {
    return Sync();
  }

 protected:
  /**
   * @brief noop
   * @details [long description]
   *
   * @param offset [description]
   * @param len [description]
   *
   * @return [description]
   */
  Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
    return Status::OK();
  }
};


// Directory object represents collection of files and implements
// filesystem operations that can be executed on directories.
class LibradosDirectory : public Directory {
  std::string _fid;

 public:
  explicit LibradosDirectory(librados::IoCtx* /*io_ctx*/, std::string fid)
      : _fid(fid) {}

  // Fsync directory. Can be called concurrently from multiple threads.
  Status Fsync() { return Status::OK(); }
};

// Identifies a locked file.
// This is exclusive lock and can't nested lock by same thread
class LibradosFileLock : public FileLock {
  librados::IoCtx * _io_ctx;
  const std::string _obj_name;
  const std::string _lock_name;
  const std::string _cookie;

 public:
  LibradosFileLock(
    librados::IoCtx * io_ctx,
    const std::string obj_name):
    _io_ctx(io_ctx),
    _obj_name(obj_name),
    _lock_name("lock_name"),
    _cookie("cookie") {

    // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
    while (!_io_ctx->lock_exclusive(
             _obj_name,
             _lock_name,
             _cookie,
             "description", nullptr, 0));
  }

  ~LibradosFileLock() {
    _io_ctx->unlock(_obj_name, _lock_name, _cookie);
  }
};


// --------------------
// --- EnvLibrados ----
// --------------------
/**
 * @brief EnvLibrados ctor
 * @details [long description]
 *
 * @param db_name unique database name
 * @param config_path the configure file path for rados
 */
EnvLibrados::EnvLibrados(const std::string& db_name,
                         const std::string& config_path,
                         const std::string& db_pool)
  : EnvLibrados("client.admin",
                "ceph",
                0,
                db_name,
                config_path,
                db_pool,
                "/wal",
                db_pool,
                1 << 20) {}

/**
 * @brief EnvLibrados ctor
 * @details [long description]
 *
 * @param client_name       first 3 parameters is for RADOS client init
 * @param cluster_name
 * @param flags
 * @param db_name           unique database name, used as db_id key
 * @param config_path the   configure file path for rados
 * @param db_pool the pool  for db data
 * @param wal_pool the pool for WAL data
 * @param write_buffer_size WritableFile buffer max size
 */
EnvLibrados::EnvLibrados(const std::string& client_name,
                         const std::string& cluster_name,
                         const uint64_t flags,
                         const std::string& db_name,
                         const std::string& config_path,
                         const std::string& db_pool,
                         const std::string& wal_dir,
                         const std::string& wal_pool,
                         const uint64_t write_buffer_size)
  : EnvWrapper(Env::Default()),
    _client_name(client_name),
    _cluster_name(cluster_name),
    _flags(flags),
    _db_name(db_name),
    _config_path(config_path),
    _db_pool_name(db_pool),
    _wal_dir(wal_dir),
    _wal_pool_name(wal_pool),
    _write_buffer_size(write_buffer_size) {
  int ret = 0;

  // 1. create a Rados object and initialize it
  ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring
  if (ret < 0) { // let's handle any error that might have come back
    std::cerr << "couldn't initialize rados! error " << ret << std::endl;
    ret = EXIT_FAILURE;
    goto out;
  }

  // 2. read configure file
  ret = _rados.conf_read_file(_config_path.c_str());
  if (ret < 0) {
    // This could fail if the config file is malformed, but it'd be hard.
    std::cerr << "failed to parse config file " << _config_path
              << "! error" << ret << std::endl;
    ret = EXIT_FAILURE;
    goto out;
  }

  // 3. we actually connect to the cluster
  ret = _rados.connect();
  if (ret < 0) {
    std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
    ret = EXIT_FAILURE;
    goto out;
  }

  // 4. create db_pool if not exist
  ret = _rados.pool_create(_db_pool_name.c_str());
  if (ret < 0 && ret != -EEXIST && ret !=  -EPERM) {
    std::cerr << "couldn't create pool! error " << ret << std::endl;
    goto out;
  }

  // 5. create db_pool_ioctx
  ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx);
  if (ret < 0) {
    std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
    ret = EXIT_FAILURE;
    goto out;
  }

  // 6. create wal_pool if not exist
  ret = _rados.pool_create(_wal_pool_name.c_str());
  if (ret < 0 && ret != -EEXIST && ret !=  -EPERM) {
    std::cerr << "couldn't create pool! error " << ret << std::endl;
    goto out;
  }

  // 7. create wal_pool_ioctx
  ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx);
  if (ret < 0) {
    std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
    ret = EXIT_FAILURE;
    goto out;
  }

  // 8. add root dir
  _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE);

out:
  LOG_DEBUG("rados connect result code : %i\n", ret);
}

/****************************************************
  private functions to handle fid operation.
  Dir also have fid, but the value is DIR_ID_VALUE
****************************************************/

/**
 * @brief generate a new fid
 * @details [long description]
 * @return [description]
 */
std::string EnvLibrados::_CreateFid() {
  return _db_name + "." + GenerateUniqueId();
}

/**
 * @brief get fid
 * @details [long description]
 *
 * @param fname [description]
 * @param fid [description]
 *
 * @return
 *  Status::OK()
 *  Status::NotFound()
 */
Status EnvLibrados::_GetFid(
  const std::string &fname,
  std::string& fid) {
  std::set<std::string> keys;
  std::map<std::string, librados::bufferlist> kvs;
  keys.insert(fname);
  int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs);

  if (0 == r && 0 == kvs.size()) {
    return Status::NotFound();
  } else if (0 == r && 0 != kvs.size()) {
    fid.assign(kvs[fname].c_str(), kvs[fname].length());
    return Status::OK();
  } else {
    return err_to_status(r);
  }
}

/**
 * @brief rename fid
 * @details Only modify object in rados once,
 * so this rename operation is atomic in term of rados
 *
 * @param old_fname [description]
 * @param new_fname [description]
 *
 * @return [description]
 */
Status EnvLibrados::_RenameFid(const std::string& old_fname,
                               const std::string& new_fname) {
  std::string fid;
  Status s = _GetFid(old_fname, fid);

  if (Status::OK() != s) {
    return s;
  }

  librados::bufferlist bl;
  std::set<std::string> keys;
  std::map<std::string, librados::bufferlist> kvs;
  librados::ObjectWriteOperation o;
  bl.append(fid);
  keys.insert(old_fname);
  kvs[new_fname] = bl;
  o.omap_rm_keys(keys);
  o.omap_set(kvs);
  int r = _db_pool_ioctx.operate(_db_name, &o);
  return err_to_status(r);
}

/**
 * @brief add <file path, fid> to metadata object. It may overwrite exist key.
 * @details [long description]
 *
 * @param fname [description]
 * @param fid [description]
 *
 * @return [description]
 */
Status EnvLibrados::_AddFid(
  const std::string& fname,
  const std::string& fid) {
  std::map<std::string, librados::bufferlist> kvs;
  librados::bufferlist value;
  value.append(fid);
  kvs[fname] = value;
  int r = _db_pool_ioctx.omap_set(_db_name, kvs);
  return err_to_status(r);
}

/**
 * @brief return subfile names of dir.
 * @details
 *  RocksDB has a 2-level structure, so all keys
 *  that have dir as prefix are subfiles of dir.
 *  So we can just return these files' name.
 *
 * @param dir [description]
 * @param result [description]
 *
 * @return [description]
 */
Status EnvLibrados::_GetSubFnames(
  const std::string& dir,
  std::vector<std::string> * result
) {
  std::string start_after(dir);
  std::string filter_prefix(dir);
  std::map<std::string, librados::bufferlist> kvs;
  _db_pool_ioctx.omap_get_vals(_db_name,
                               start_after, filter_prefix,
                               MAX_ITEMS_IN_FS, &kvs);

  result->clear();
  for (auto i = kvs.begin(); i != kvs.end(); i++) {
    result->push_back(i->first.substr(dir.size() + 1));
  }
  return Status::OK();
}

/**
 * @brief delete key fname from metadata object
 * @details [long description]
 *
 * @param fname [description]
 * @return [description]
 */
Status EnvLibrados::_DelFid(
  const std::string& fname) {
  std::set<std::string> keys;
  keys.insert(fname);
  int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys);
  return err_to_status(r);
}

/**
 * @brief get match IoCtx from _prefix_pool_map
 * @details [long description]
 *
 * @param prefix [description]
 * @return [description]
 *
 */
librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) {
  auto is_prefix = [](const std::string & s1, const std::string & s2) {
    auto it1 = s1.begin(), it2 = s2.begin();
    while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2;
    return it1 == s1.end();
  };

  if (is_prefix(_wal_dir, fpath)) {
    return &_wal_pool_ioctx;
  } else {
    return &_db_pool_ioctx;
  }
}

/************************************************************
                public functions
************************************************************/
/**
 * @brief create a new sequential read file handler
 * @details it will check the existence of fname
 *
 * @param fname [description]
 * @param result [description]
 * @param options [description]
 * @return [description]
 */
Status EnvLibrados::NewSequentialFile(const std::string& fname,
                                      std::unique_ptr<SequentialFile>* result,
                                      const EnvOptions& /*options*/) {
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string dir, file, fid;
  split(fname, &dir, &file);
  Status s;
  std::string fpath = dir + "/" + file;
  do {
    s = _GetFid(dir, fid);

    if (!s.ok() || fid != DIR_ID_VALUE) {
      if (fid != DIR_ID_VALUE) s = Status::IOError();
      break;
    }

    s = _GetFid(fpath, fid);

    if (Status::NotFound() == s) {
      s = Status::IOError();
      errno = ENOENT;
      break;
    }

    result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath));
    s = Status::OK();
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief create a new random access file handler
 * @details it will check the existence of fname
 *
 * @param fname [description]
 * @param result [description]
 * @param options [description]
 * @return [description]
 */
Status EnvLibrados::NewRandomAccessFile(
    const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
    const EnvOptions& /*options*/) {
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string dir, file, fid;
  split(fname, &dir, &file);
  Status s;
  std::string fpath = dir + "/" + file;
  do {
    s = _GetFid(dir, fid);

    if (!s.ok() || fid != DIR_ID_VALUE) {
      s = Status::IOError();
      break;
    }

    s = _GetFid(fpath, fid);

    if (Status::NotFound() == s) {
      s = Status::IOError();
      errno = ENOENT;
      break;
    }

    result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath));
    s = Status::OK();
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief create a new write file handler
 * @details it will check the existence of fname
 *
 * @param fname [description]
 * @param result [description]
 * @param options [description]
 * @return [description]
 */
Status EnvLibrados::NewWritableFile(
  const std::string& fname,
  std::unique_ptr<WritableFile>* result,
  const EnvOptions& options)
{
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string dir, file, fid;
  split(fname, &dir, &file);
  Status s;
  std::string fpath = dir + "/" + file;

  do {
    // 1. check if dir exist
    s = _GetFid(dir, fid);
    if (!s.ok()) {
      break;
    }

    if (fid != DIR_ID_VALUE) {
      s = Status::IOError();
      break;
    }

    // 2. check if file exist.
    // 2.1 exist, use it
    // 2.2 not exist, create it
    s = _GetFid(fpath, fid);
    if (Status::NotFound() == s) {
      fid = _CreateFid();
      _AddFid(fpath, fid);
    }

    result->reset(
        new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this, options));
    s = Status::OK();
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief reuse write file handler
 * @details
 *  This function will rename old_fname to new_fname,
 *  then return the handler of new_fname
 *
 * @param new_fname [description]
 * @param old_fname [description]
 * @param result [description]
 * @param options [description]
 * @return [description]
 */
Status EnvLibrados::ReuseWritableFile(
  const std::string& new_fname,
  const std::string& old_fname,
  std::unique_ptr<WritableFile>* result,
  const EnvOptions& options)
{
  LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str());
  std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
  split(old_fname, &src_dir, &src_file);
  split(new_fname, &dst_dir, &dst_file);

  std::string src_fpath = src_dir + "/" + src_file;
  std::string dst_fpath = dst_dir + "/" + dst_file;
  Status r = Status::OK();
  do {
    r = _RenameFid(src_fpath,
                   dst_fpath);
    if (!r.ok()) {
      break;
    }

    result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid,
                                           dst_fpath, this, options));
  } while (0);

  LOG_DEBUG("[OUT]%s\n", r.ToString().c_str());
  return r;
}

/**
 * @brief create a new directory handler
 * @details [long description]
 *
 * @param name [description]
 * @param result [description]
 *
 * @return [description]
 */
Status EnvLibrados::NewDirectory(
  const std::string& name,
  std::unique_ptr<Directory>* result)
{
  LOG_DEBUG("[IN]%s\n", name.c_str());
  std::string fid, dir, file;
  /* just want to get dir name */
  split(name + "/tmp", &dir, &file);
  Status s;

  do {
    s = _GetFid(dir, fid);

    if (!s.ok() || DIR_ID_VALUE != fid) {
      s = Status::IOError(name, strerror(-ENOENT));
      break;
    }

    if (Status::NotFound() == s) {
      s = _AddFid(dir, DIR_ID_VALUE);
      if (!s.ok()) break;
    } else if (!s.ok()) {
      break;
    }

    result->reset(new LibradosDirectory(_GetIoctx(dir), dir));
    s = Status::OK();
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief check if fname is exist
 * @details [long description]
 *
 * @param fname [description]
 * @return [description]
 */
Status EnvLibrados::FileExists(const std::string& fname)
{
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string fid, dir, file;
  split(fname, &dir, &file);
  Status s = _GetFid(dir + "/" + file, fid);

  if (s.ok() && fid != DIR_ID_VALUE) {
    s = Status::OK();
  }

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief get subfile name of dir_in
 * @details [long description]
 *
 * @param dir_in [description]
 * @param result [description]
 *
 * @return [description]
 */
Status EnvLibrados::GetChildren(
  const std::string& dir_in,
  std::vector<std::string>* result)
{
  LOG_DEBUG("[IN]%s\n", dir_in.c_str());
  std::string fid, dir, file;
  split(dir_in + "/temp", &dir, &file);
  Status s;

  do {
    s = _GetFid(dir, fid);
    if (!s.ok()) {
      break;
    }

    if (fid != DIR_ID_VALUE) {
      s = Status::IOError();
      break;
    }

    s = _GetSubFnames(dir, result);
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief delete fname
 * @details [long description]
 *
 * @param fname [description]
 * @return [description]
 */
Status EnvLibrados::DeleteFile(const std::string& fname)
{
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string fid, dir, file;
  split(fname, &dir, &file);
  Status s = _GetFid(dir + "/" + file, fid);

  if (s.ok() && DIR_ID_VALUE != fid) {
    s = _DelFid(dir + "/" + file);
  } else {
    s = Status::NotFound();
  }
  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief create new dir
 * @details [long description]
 *
 * @param dirname [description]
 * @return [description]
 */
Status EnvLibrados::CreateDir(const std::string& dirname)
{
  LOG_DEBUG("[IN]%s\n", dirname.c_str());
  std::string fid, dir, file;
  split(dirname + "/temp", &dir, &file);
  Status s = _GetFid(dir + "/" + file, fid);

  do {
    if (Status::NotFound() != s && fid != DIR_ID_VALUE) {
      break;
    } else if (Status::OK() == s && fid == DIR_ID_VALUE) {
      break;
    }

    s = _AddFid(dir, DIR_ID_VALUE);
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief create dir if missing
 * @details [long description]
 *
 * @param dirname [description]
 * @return [description]
 */
Status EnvLibrados::CreateDirIfMissing(const std::string& dirname)
{
  LOG_DEBUG("[IN]%s\n", dirname.c_str());
  std::string fid, dir, file;
  split(dirname + "/temp", &dir, &file);
  Status s = Status::OK();

  do {
    s = _GetFid(dir, fid);
    if (Status::NotFound() != s) {
      break;
    }

    s = _AddFid(dir, DIR_ID_VALUE);
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief delete dir
 * @details
 *
 * @param dirname [description]
 * @return [description]
 */
Status EnvLibrados::DeleteDir(const std::string& dirname)
{
  LOG_DEBUG("[IN]%s\n", dirname.c_str());
  std::string fid, dir, file;
  split(dirname + "/temp", &dir, &file);
  Status s = Status::OK();

  s = _GetFid(dir, fid);

  if (s.ok() && DIR_ID_VALUE == fid) {
    std::vector<std::string> subs;
    s = _GetSubFnames(dir, &subs);
    // if subfiles exist, can't delete dir
    if (subs.size() > 0) {
      s = Status::IOError();
    } else {
      s = _DelFid(dir);
    }
  } else {
    s = Status::NotFound();
  }

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief return file size
 * @details [long description]
 *
 * @param fname [description]
 * @param file_size [description]
 *
 * @return [description]
 */
Status EnvLibrados::GetFileSize(
  const std::string& fname,
  uint64_t* file_size)
{
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string fid, dir, file;
  split(fname, &dir, &file);
  time_t mtime;
  Status s;

  do {
    std::string fpath = dir + "/" + file;
    s = _GetFid(fpath, fid);

    if (!s.ok()) {
      break;
    }

    int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime);
    if (ret < 0) {
      LOG_DEBUG("%i\n", ret);
      if (-ENOENT == ret) {
        *file_size = 0;
        s = Status::OK();
      } else {
        s = err_to_status(ret);
      }
    } else {
      s = Status::OK();
    }
  } while (0);

  LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size);
  return s;
}

/**
 * @brief get file modification time
 * @details [long description]
 *
 * @param fname [description]
 * @param file_mtime [description]
 *
 * @return [description]
 */
Status EnvLibrados::GetFileModificationTime(const std::string& fname,
    uint64_t* file_mtime)
{
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string fid, dir, file;
  split(fname, &dir, &file);
  time_t mtime;
  uint64_t file_size;
  Status s = Status::OK();
  do {
    std::string fpath = dir + "/" + file;
    s = _GetFid(dir + "/" + file, fid);

    if (!s.ok()) {
      break;
    }

    int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime);
    if (ret < 0) {
      if (Status::NotFound() == err_to_status(ret)) {
        *file_mtime = static_cast<uint64_t>(mtime);
        s = Status::OK();
      } else {
        s = err_to_status(ret);
      }
    } else {
      s = Status::OK();
    }
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief rename file
 * @details
 *
 * @param src [description]
 * @param target_in [description]
 *
 * @return [description]
 */
Status EnvLibrados::RenameFile(
  const std::string& src,
  const std::string& target_in)
{
  LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str());
  std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
  split(src, &src_dir, &src_file);
  split(target_in, &dst_dir, &dst_file);

  auto s = _RenameFid(src_dir + "/" + src_file,
                      dst_dir + "/" + dst_file);
  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief not support
 * @details [long description]
 *
 * @param src [description]
 * @param target_in [description]
 *
 * @return [description]
 */
Status EnvLibrados::LinkFile(
  const std::string& src,
  const std::string& target_in)
{
  (void)src;
  (void)target_in;
  LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str());
  return Status::NotSupported();
}

/**
 * @brief lock file. create if missing.
 * @details [long description]
 *
 * It seems that LockFile is used for preventing other instance of RocksDB
 * from opening up the database at the same time. From RocksDB source code,
 * the invokes of LockFile are at following locations:
 *
 *  ./db/db_impl.cc:1159:    s = env_->LockFile(LockFileName(dbname_), &db_lock_);    // DBImpl::Recover
 *  ./db/db_impl.cc:5839:  Status result = env->LockFile(lockname, &lock);            // Status DestroyDB
 *
 * When db recovery and db destroy, RocksDB will call LockFile
 *
 * @param fname [description]
 * @param lock [description]
 *
 * @return [description]
 */
Status EnvLibrados::LockFile(
  const std::string& fname,
  FileLock** lock)
{
  LOG_DEBUG("[IN]%s\n", fname.c_str());
  std::string fid, dir, file;
  split(fname, &dir, &file);
  Status s = Status::OK();

  do {
    std::string fpath = dir + "/" + file;
    s = _GetFid(fpath, fid);

    if (Status::OK() != s &&
        Status::NotFound() != s) {
      break;
    } else if (Status::NotFound() == s) {
      s = _AddFid(fpath, _CreateFid());
      if (!s.ok()) {
        break;
      }
    } else if (Status::OK() == s && DIR_ID_VALUE == fid) {
      s = Status::IOError();
      break;
    }

    *lock = new LibradosFileLock(_GetIoctx(fpath), fpath);
  } while (0);

  LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  return s;
}

/**
 * @brief unlock file
 * @details [long description]
 *
 * @param lock [description]
 * @return [description]
 */
Status EnvLibrados::UnlockFile(FileLock* lock)
{
  LOG_DEBUG("[IO]%p\n", lock);
  if (nullptr != lock) {
    delete lock;
  }
  return Status::OK();
}


/**
 * @brief not support
 * @details [long description]
 *
 * @param db_path [description]
 * @param output_path [description]
 *
 * @return [description]
 */
Status EnvLibrados::GetAbsolutePath(const std::string& db_path,
                                    std::string* /*output_path*/) {
  (void)db_path;
  LOG_DEBUG("[IO]%s\n", db_path.c_str());
  return Status::NotSupported();
}

/**
 * @brief Get default EnvLibrados
 * @details [long description]
 * @return [description]
 */
EnvLibrados* EnvLibrados::Default() {
  static EnvLibrados default_env(default_db_name,
                                 std::getenv(default_config_path),
                                 default_pool_name);
  return &default_env;
}

}  // namespace ROCKSDB_NAMESPACE
back to top