Revision 58b12dfe37784ce1bd67a03643147ab94f590451 authored by Reid Horuff on 06 March 2017, 23:04:36 UTC, committed by Facebook Github Bot on 06 March 2017, 23:09:11 UTC
Summary:
Relating to #1903:

In MaybeFlushColumnFamilies() we want to modify the 'getting_flushed' flag before releasing the db mutex when SwitchMemtable() is called.

The following 2 actions need to be atomic in MaybeFlushColumnFamilies()
- getting_flushed is false on oldest log
- we determine that all CFs can be flushed to successfully release oldest log
- we set getting_flushed = true on the oldest log.
-------
- getting_flushed is false on oldest log
- we determine that all CFs can NOT be flushed to successfully release oldest log
- we set unable_to_flush_oldest_log_ = true on the oldest log.

#### In the 2pc case:

T1 enters function but is unable to flush all CFs to release log
T1 sets unable_to_flush_oldest_log_ = true
T1 begins flushing all CFs possible

T2 enters function but is unable to flush all CFs to release log
T2 sees unable_to_flush_oldes_log_ has been set so exits

T3 enters function and will be able to flush all CFs to release oldest log
T3 sets getting_flushed = true on oldes
Closes https://github.com/facebook/rocksdb/pull/1909

Differential Revision: D4646235

Pulled By: reidHoruff

fbshipit-source-id: c8d0447
1 parent f8a4ea0
Raw File
partitioned_filter_block.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.

#include "table/partitioned_filter_block.h"

#include "port/port.h"
#include "util/coding.h"

namespace rocksdb {
PartitionIndexBuilder::PartitionIndexBuilder(
    const InternalKeyComparator* comparator,
    const SliceTransform* prefix_extractor, const uint64_t index_per_partition,
    int index_block_restart_interval)
    : IndexBuilder(comparator),
      prefix_extractor_(prefix_extractor),
      index_block_builder_(index_block_restart_interval),
      index_per_partition_(index_per_partition),
      index_block_restart_interval_(index_block_restart_interval) {
  sub_index_builder_ =
      CreateIndexBuilder(sub_type_, comparator_, prefix_extractor_,
                         index_block_restart_interval_, index_per_partition_);
}

PartitionIndexBuilder::~PartitionIndexBuilder() { delete sub_index_builder_; }

void PartitionIndexBuilder::AddIndexEntry(
    std::string* last_key_in_current_block,
    const Slice* first_key_in_next_block, const BlockHandle& block_handle) {
  sub_index_builder_->AddIndexEntry(last_key_in_current_block,
                                    first_key_in_next_block, block_handle);
  num_indexes++;
  if (UNLIKELY(first_key_in_next_block == nullptr)) {  // no more keys
    entries_.push_back({std::string(*last_key_in_current_block),
                        std::unique_ptr<IndexBuilder>(sub_index_builder_)});
    sub_index_builder_ = nullptr;
  } else if (num_indexes % index_per_partition_ == 0) {
    entries_.push_back({std::string(*last_key_in_current_block),
                        std::unique_ptr<IndexBuilder>(sub_index_builder_)});
    sub_index_builder_ =
        CreateIndexBuilder(sub_type_, comparator_, prefix_extractor_,
                           index_block_restart_interval_, index_per_partition_);
  }
}

Status PartitionIndexBuilder::Finish(
    IndexBlocks* index_blocks, const BlockHandle& last_partition_block_handle) {
  assert(!entries_.empty());
  // It must be set to null after last key is added
  assert(sub_index_builder_ == nullptr);
  if (finishing == true) {
    Entry& last_entry = entries_.front();
    std::string handle_encoding;
    last_partition_block_handle.EncodeTo(&handle_encoding);
    index_block_builder_.Add(last_entry.key, handle_encoding);
    entries_.pop_front();
  }
  // If there is no sub_index left, then return the 2nd level index.
  if (UNLIKELY(entries_.empty())) {
    index_blocks->index_block_contents = index_block_builder_.Finish();
    return Status::OK();
  } else {
    // Finish the next partition index in line and Incomplete() to indicate we
    // expect more calls to Finish
    Entry& entry = entries_.front();
    auto s = entry.value->Finish(index_blocks);
    finishing = true;
    return s.ok() ? Status::Incomplete() : s;
  }
}

size_t PartitionIndexBuilder::EstimatedSize() const {
  size_t total = 0;
  for (auto it = entries_.begin(); it != entries_.end(); ++it) {
    total += it->value->EstimatedSize();
  }
  total += index_block_builder_.CurrentSizeEstimate();
  total +=
      sub_index_builder_ == nullptr ? 0 : sub_index_builder_->EstimatedSize();
  return total;
}

}  // namespace rocksdb
back to top