Revision 1f32dc7d2b6721a1fe13eb515d52e5cd6f110f59 authored by Andrew Kryczka on 14 June 2018, 00:28:31 UTC, committed by Facebook Github Bot on 14 June 2018, 00:32:04 UTC
Summary:
Rebased and resubmitting #1831 on behalf of stevelittle.

The problem is when a single process attempts to open the same DB twice, the second attempt fails due to LOCK file held. If the second attempt had opened the LOCK file, it'll now need to close it, and closing causes the file to be unlocked. Then, any subsequent attempt to open the DB will succeed, which is the wrong behavior.

The solution was to track which files a process has locked in PosixEnv, and check those before opening a LOCK file.

Fixes #1780.
Closes https://github.com/facebook/rocksdb/pull/3993

Differential Revision: D8398984

Pulled By: ajkr

fbshipit-source-id: 2755fe66950a0c9de63075f932f9e15768041918
1 parent 7497f99
Raw File
managed_iterator.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#ifndef ROCKSDB_LITE

#include "db/managed_iterator.h"

#include <limits>
#include <string>
#include <utility>

#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "table/merging_iterator.h"

namespace rocksdb {

namespace {
// Helper class that locks a mutex on construction and unlocks the mutex when
// the destructor of the MutexLock object is invoked.
//
// Typical usage:
//
//   void MyClass::MyMethod() {
//     MILock l(&mu_);       // mu_ is an instance variable
//     ... some complex code, possibly with multiple return paths ...
//   }

class MILock {
 public:
  explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) {
    this->mu_->lock();
  }
  ~MILock() {
    this->mu_->unlock();
  }
  ManagedIterator* GetManagedIterator() { return mi_; }

 private:
  std::mutex* const mu_;
  ManagedIterator* mi_;
  // No copying allowed
  MILock(const MILock&) = delete;
  void operator=(const MILock&) = delete;
};
}  // anonymous namespace

//
// Synchronization between modifiers, releasers, creators
// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use
//  if modifying mutable_iter, atomically exchange in_use:
//  return if in_use set / otherwise set in use,
//  atomically replace new iter with old , reset in use
//  The releaser is the new operation and it holds a lock for a very short time
//  The existing non-const iterator operations are supposed to be single
//  threaded and hold the lock for the duration of the operation
//  The existing const iterator operations use the cached key/values
//  and don't do any locking.
ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options,
                                 ColumnFamilyData* cfd)
    : db_(db),
      read_options_(read_options),
      cfd_(cfd),
      svnum_(cfd->GetSuperVersionNumber()),
      mutable_iter_(nullptr),
      valid_(false),
      snapshot_created_(false),
      release_supported_(true) {
  read_options_.managed = false;
  if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) {
    assert(nullptr != (read_options_.snapshot = db_->GetSnapshot()));
    snapshot_created_ = true;
  }
  cfh_.SetCFD(cfd);
  mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_));
}

ManagedIterator::~ManagedIterator() {
  Lock();
  if (snapshot_created_) {
    db_->ReleaseSnapshot(read_options_.snapshot);
    snapshot_created_ = false;
    read_options_.snapshot = nullptr;
  }
  UnLock();
}

bool ManagedIterator::Valid() const { return valid_; }

void ManagedIterator::SeekToLast() {
  MILock l(&in_use_, this);
  if (NeedToRebuild()) {
    RebuildIterator();
  }
  assert(mutable_iter_ != nullptr);
  mutable_iter_->SeekToLast();
  UpdateCurrent();
}

void ManagedIterator::SeekToFirst() {
  MILock l(&in_use_, this);
  SeekInternal(Slice(), true);
}

void ManagedIterator::Seek(const Slice& user_key) {
  MILock l(&in_use_, this);
  SeekInternal(user_key, false);
}

void ManagedIterator::SeekForPrev(const Slice& user_key) {
  MILock l(&in_use_, this);
  if (NeedToRebuild()) {
    RebuildIterator();
  }
  assert(mutable_iter_ != nullptr);
  mutable_iter_->SeekForPrev(user_key);
  UpdateCurrent();
}

void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) {
  if (NeedToRebuild()) {
    RebuildIterator();
  }
  assert(mutable_iter_ != nullptr);
  if (seek_to_first) {
    mutable_iter_->SeekToFirst();
  } else {
    mutable_iter_->Seek(user_key);
  }
  UpdateCurrent();
}

void ManagedIterator::Prev() {
  if (!valid_) {
    status_ = Status::InvalidArgument("Iterator value invalid");
    return;
  }
  MILock l(&in_use_, this);
  if (NeedToRebuild()) {
    RebuildIterator(true);
    if (!valid_) {
      return;
    }
  }
  mutable_iter_->Prev();
  UpdateCurrent();
}

void ManagedIterator::Next() {
  if (!valid_) {
    status_ = Status::InvalidArgument("Iterator value invalid");
    return;
  }
  MILock l(&in_use_, this);
  if (NeedToRebuild()) {
    RebuildIterator(true);
    if (!valid_) {
      return;
    }
  }
  mutable_iter_->Next();
  UpdateCurrent();
}

Slice ManagedIterator::key() const {
  assert(valid_);
  return cached_key_.GetUserKey();
}

Slice ManagedIterator::value() const {
  assert(valid_);
  return cached_value_.GetUserKey();
}

Status ManagedIterator::status() const { return status_; }

void ManagedIterator::RebuildIterator(bool reseek) {
  std::string current_key;
  if (reseek) {
    current_key = key().ToString();
  }

  svnum_ = cfd_->GetSuperVersionNumber();
  mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));

  if (reseek) {
    Slice old_key(current_key.data(), current_key.size());
    SeekInternal(old_key, false);
    UpdateCurrent();
    if (!valid_ || key().compare(old_key) != 0) {
      valid_ = false;
      status_ = Status::Incomplete(
          "Next/Prev failed because current key has "
          "been removed");
    }
  }
}

void ManagedIterator::UpdateCurrent() {
  assert(mutable_iter_ != nullptr);

  valid_ = mutable_iter_->Valid();
  status_ = mutable_iter_->status();

  if (!valid_) {
    return;
  }

  cached_key_.SetUserKey(mutable_iter_->key());
  cached_value_.SetUserKey(mutable_iter_->value());
}

void ManagedIterator::ReleaseIter(bool only_old) {
  if ((mutable_iter_ == nullptr) || (!release_supported_)) {
    return;
  }
  if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) {
    if (!TryLock()) {  // Don't release iter if in use
      return;
    }
    mutable_iter_ = nullptr;  // in_use for a very short time
    UnLock();
  }
}

bool ManagedIterator::NeedToRebuild() {
  if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) ||
      (!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) {
    return true;
  }
  return false;
}

void ManagedIterator::Lock() {
  in_use_.lock();
  return;
}

bool ManagedIterator::TryLock() { return in_use_.try_lock(); }

void ManagedIterator::UnLock() {
  in_use_.unlock();
}

}  // namespace rocksdb

#endif  // ROCKSDB_LITE
back to top