Revision 1f32dc7d2b6721a1fe13eb515d52e5cd6f110f59 authored by Andrew Kryczka on 14 June 2018, 00:28:31 UTC, committed by Facebook Github Bot on 14 June 2018, 00:32:04 UTC
Summary:
Rebased and resubmitting #1831 on behalf of stevelittle.

The problem is when a single process attempts to open the same DB twice, the second attempt fails due to LOCK file held. If the second attempt had opened the LOCK file, it'll now need to close it, and closing causes the file to be unlocked. Then, any subsequent attempt to open the DB will succeed, which is the wrong behavior.

The solution was to track which files a process has locked in PosixEnv, and check those before opening a LOCK file.

Fixes #1780.
Closes https://github.com/facebook/rocksdb/pull/3993

Differential Revision: D8398984

Pulled By: ajkr

fbshipit-source-id: 2755fe66950a0c9de63075f932f9e15768041918
1 parent 7497f99
Raw File
db_impl_experimental.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_impl.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <vector>

#include "db/column_family.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "rocksdb/status.h"

namespace rocksdb {

#ifndef ROCKSDB_LITE
Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
                                   const Slice* begin, const Slice* end) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
  InternalKey start_key, end_key;
  if (begin != nullptr) {
    start_key.SetMinPossibleForUserKey(*begin);
  }
  if (end != nullptr) {
    end_key.SetMaxPossibleForUserKey(*end);
  }
  {
    InstrumentedMutexLock l(&mutex_);
    auto vstorage = cfd->current()->storage_info();
    for (int level = 0; level < vstorage->num_non_empty_levels() - 1; ++level) {
      std::vector<FileMetaData*> inputs;
      vstorage->GetOverlappingInputs(
          level, begin == nullptr ? nullptr : &start_key,
          end == nullptr ? nullptr : &end_key, &inputs);
      for (auto f : inputs) {
        f->marked_for_compaction = true;
      }
    }
    // Since we have some more files to compact, we should also recompute
    // compaction score
    vstorage->ComputeCompactionScore(*cfd->ioptions(),
                                     *cfd->GetLatestMutableCFOptions());
    SchedulePendingCompaction(cfd);
    MaybeScheduleFlushOrCompaction();
  }
  return Status::OK();
}

Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
  assert(column_family);

  if (target_level < 1) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "PromoteL0 FAILED. Invalid target level %d\n", target_level);
    return Status::InvalidArgument("Invalid target level");
  }

  Status status;
  VersionEdit edit;
  JobContext job_context(next_job_id_.fetch_add(1), true);
  {
    InstrumentedMutexLock l(&mutex_);
    auto* cfd = static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
    const auto* vstorage = cfd->current()->storage_info();

    if (target_level >= vstorage->num_levels()) {
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                     "PromoteL0 FAILED. Target level %d does not exist\n",
                     target_level);
      job_context.Clean();
      return Status::InvalidArgument("Target level does not exist");
    }

    // Sort L0 files by range.
    const InternalKeyComparator* icmp = &cfd->internal_comparator();
    auto l0_files = vstorage->LevelFiles(0);
    std::sort(l0_files.begin(), l0_files.end(),
              [icmp](FileMetaData* f1, FileMetaData* f2) {
                return icmp->Compare(f1->largest, f2->largest) < 0;
              });

    // Check that no L0 file is being compacted and that they have
    // non-overlapping ranges.
    for (size_t i = 0; i < l0_files.size(); ++i) {
      auto f = l0_files[i];
      if (f->being_compacted) {
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "PromoteL0 FAILED. File %" PRIu64 " being compacted\n",
                       f->fd.GetNumber());
        job_context.Clean();
        return Status::InvalidArgument("PromoteL0 called during L0 compaction");
      }

      if (i == 0) continue;
      auto prev_f = l0_files[i - 1];
      if (icmp->Compare(prev_f->largest, f->smallest) >= 0) {
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64
                       " have overlapping ranges\n",
                       prev_f->fd.GetNumber(), f->fd.GetNumber());
        job_context.Clean();
        return Status::InvalidArgument("L0 has overlapping files");
      }
    }

    // Check that all levels up to target_level are empty.
    for (int level = 1; level <= target_level; ++level) {
      if (vstorage->NumLevelFiles(level) > 0) {
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "PromoteL0 FAILED. Level %d not empty\n", level);
        job_context.Clean();
        return Status::InvalidArgument(
            "All levels up to target_level "
            "must be empty");
      }
    }

    edit.SetColumnFamily(cfd->GetID());
    for (const auto& f : l0_files) {
      edit.DeleteFile(0, f->fd.GetNumber());
      edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
                   f->fd.GetFileSize(), f->smallest, f->largest,
                   f->smallest_seqno, f->largest_seqno,
                   f->marked_for_compaction);
    }

    status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
                                    &edit, &mutex_, directories_.GetDbDir());
    if (status.ok()) {
      InstallSuperVersionAndScheduleWork(
          cfd, &job_context.superversion_context,
          *cfd->GetLatestMutableCFOptions());
    }
  }  // lock released here
  LogFlush(immutable_db_options_.info_log);
  job_context.Clean();

  return status;
}
#endif  // ROCKSDB_LITE

}  // namespace rocksdb
back to top