https://github.com/facebook/rocksdb
Raw File
Tip revision: add2ac5fb826463e5525a3e3b1a4a1714dfbeffb authored by Igor Canadi on 11 June 2015, 22:49:11 UTC
Bump the version to 3.11.2
Tip revision: add2ac5
compaction_job.cc
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/compaction_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <memory>
#include <list>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/event_logger_helpers.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_helper.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"

namespace rocksdb {

struct CompactionJob::CompactionState {
  Compaction* const compaction;

  // Files produced by compaction
  struct Output {
    uint64_t number;
    uint32_t path_id;
    uint64_t file_size;
    InternalKey smallest, largest;
    SequenceNumber smallest_seqno, largest_seqno;
  };
  std::vector<Output> outputs;

  // State kept for output being generated
  std::unique_ptr<WritableFile> outfile;
  std::unique_ptr<TableBuilder> builder;

  uint64_t total_bytes;

  Output* current_output() { return &outputs[outputs.size() - 1]; }

  explicit CompactionState(Compaction* c)
      : compaction(c),
        total_bytes(0),
        num_input_records(0),
        num_output_records(0) {}

  // Create a client visible context of this compaction
  CompactionFilter::Context GetFilterContextV1() {
    CompactionFilter::Context context;
    context.is_full_compaction = compaction->IsFullCompaction();
    context.is_manual_compaction = compaction->IsManualCompaction();
    return context;
  }

  // Create a client visible context of this compaction
  CompactionFilterContext GetFilterContext() {
    CompactionFilterContext context;
    context.is_full_compaction = compaction->IsFullCompaction();
    context.is_manual_compaction = compaction->IsManualCompaction();
    return context;
  }

  std::vector<std::string> key_str_buf_;
  std::vector<std::string> existing_value_str_buf_;
  // new_value_buf_ will only be appended if a value changes
  std::vector<std::string> new_value_buf_;
  // if values_changed_buf_[i] is true
  // new_value_buf_ will add a new entry with the changed value
  std::vector<bool> value_changed_buf_;
  // to_delete_buf_[i] is true iff key_buf_[i] is deleted
  std::vector<bool> to_delete_buf_;

  std::vector<std::string> other_key_str_buf_;
  std::vector<std::string> other_value_str_buf_;

  std::vector<Slice> combined_key_buf_;
  std::vector<Slice> combined_value_buf_;

  std::string cur_prefix_;

  uint64_t num_input_records;
  uint64_t num_output_records;

  // Buffers the kv-pair that will be run through compaction filter V2
  // in the future.
  void BufferKeyValueSlices(const Slice& key, const Slice& value) {
    key_str_buf_.emplace_back(key.ToString());
    existing_value_str_buf_.emplace_back(value.ToString());
  }

  // Buffers the kv-pair that will not be run through compaction filter V2
  // in the future.
  void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
    other_key_str_buf_.emplace_back(key.ToString());
    other_value_str_buf_.emplace_back(value.ToString());
  }

  // Add a kv-pair to the combined buffer
  void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
    // The real strings are stored in the batch buffers
    combined_key_buf_.emplace_back(key);
    combined_value_buf_.emplace_back(value);
  }

  // Merging the two buffers
  void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
    size_t i = 0;
    size_t j = 0;
    size_t total_size = key_str_buf_.size() + other_key_str_buf_.size();
    combined_key_buf_.reserve(total_size);
    combined_value_buf_.reserve(total_size);

    while (i + j < total_size) {
      int comp_res = 0;
      if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) {
        comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]);
      } else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) {
        comp_res = 1;
      } else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) {
        comp_res = -1;
      }
      if (comp_res > 0) {
        AddToCombinedKeyValueSlices(other_key_str_buf_[j],
                                    other_value_str_buf_[j]);
        j++;
      } else if (comp_res < 0) {
        AddToCombinedKeyValueSlices(key_str_buf_[i],
                                    existing_value_str_buf_[i]);
        i++;
      }
    }
  }

  void CleanupBatchBuffer() {
    to_delete_buf_.clear();
    key_str_buf_.clear();
    existing_value_str_buf_.clear();
    new_value_buf_.clear();
    value_changed_buf_.clear();

    to_delete_buf_.shrink_to_fit();
    key_str_buf_.shrink_to_fit();
    existing_value_str_buf_.shrink_to_fit();
    new_value_buf_.shrink_to_fit();
    value_changed_buf_.shrink_to_fit();

    other_key_str_buf_.clear();
    other_value_str_buf_.clear();
    other_key_str_buf_.shrink_to_fit();
    other_value_str_buf_.shrink_to_fit();
  }

  void CleanupMergedBuffer() {
    combined_key_buf_.clear();
    combined_value_buf_.clear();
    combined_key_buf_.shrink_to_fit();
    combined_value_buf_.shrink_to_fit();
  }
};

CompactionJob::CompactionJob(
    int job_id, Compaction* compaction, const DBOptions& db_options,
    const EnvOptions& env_options, VersionSet* versions,
    std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
    Directory* db_directory, Directory* output_directory, Statistics* stats,
    std::vector<SequenceNumber> existing_snapshots,
    std::shared_ptr<Cache> table_cache,
    std::function<uint64_t()> yield_callback, EventLogger* event_logger,
    bool paranoid_file_checks)
    : job_id_(job_id),
      compact_(new CompactionState(compaction)),
      compaction_stats_(1),
      db_options_(db_options),
      env_options_(env_options),
      env_(db_options.env),
      versions_(versions),
      shutting_down_(shutting_down),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
      output_directory_(output_directory),
      stats_(stats),
      existing_snapshots_(std::move(existing_snapshots)),
      table_cache_(std::move(table_cache)),
      yield_callback_(std::move(yield_callback)),
      event_logger_(event_logger),
      paranoid_file_checks_(paranoid_file_checks) {
  ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data());
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
  ReportStartedCompaction(compaction);
}

CompactionJob::~CompactionJob() {
  assert(compact_ == nullptr);
  ThreadStatusUtil::ResetThreadStatus();
}

void CompactionJob::ReportStartedCompaction(
    Compaction* compaction) {
  ThreadStatusUtil::SetColumnFamily(
      compact_->compaction->column_family_data());

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_id_);

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
      (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
          compact_->compaction->output_level());

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_PROP_FLAGS,
      compaction->IsManualCompaction() +
          (compaction->IsDeletionCompaction() << 1) +
          (compaction->IsTrivialMove() << 2));

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
      compaction->CalculateTotalInputSize());

  IOSTATS_RESET(bytes_written);
  IOSTATS_RESET(bytes_read);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_READ, 0);

  // Set the thread operation after operation properties
  // to ensure GetThreadList() can always show them all together.
  ThreadStatusUtil::SetThreadOperation(
      ThreadStatus::OP_COMPACTION);
}

void CompactionJob::Prepare() {
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_PREPARE);
  compact_->CleanupBatchBuffer();
  compact_->CleanupMergedBuffer();

  // Generate file_levels_ for compaction berfore making Iterator
  ColumnFamilyData* cfd __attribute__((unused)) =
      compact_->compaction->column_family_data();
  assert(cfd != nullptr);

  assert(cfd->current()->storage_info()->NumLevelFiles(
             compact_->compaction->level()) > 0);
  assert(compact_->builder == nullptr);
  assert(!compact_->outfile);

  visible_at_tip_ = 0;
  latest_snapshot_ = 0;
  if (existing_snapshots_.size() == 0) {
    // optimize for fast path if there are no snapshots
    visible_at_tip_ = versions_->LastSequence();
    earliest_snapshot_ = visible_at_tip_;
  } else {
    latest_snapshot_ = existing_snapshots_.back();
    // Add the current seqno as the 'latest' virtual
    // snapshot to the end of this list.
    existing_snapshots_.push_back(versions_->LastSequence());
    earliest_snapshot_ = existing_snapshots_[0];
  }

  // Is this compaction producing files at the bottommost level?
  bottommost_level_ = compact_->compaction->BottomMostLevel();
}

Status CompactionJob::Run() {
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_RUN);
  TEST_SYNC_POINT("CompactionJob::Run():Start");
  log_buffer_->FlushBufferToLog();
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();

  auto* compaction = compact_->compaction;
  // Let's check if anything will get logged. Don't prepare all the info if
  // we're not logging
  if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
    Compaction::InputLevelSummaryBuffer inputs_summary;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
        job_id_, compaction->InputLevelSummary(&inputs_summary),
        compaction->score());
    char scratch[2345];
    compact_->compaction->Summary(scratch, sizeof(scratch));
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
    // build event logger report
    auto stream = event_logger_->Log();
    stream << "job" << job_id_ << "event"
           << "compaction_started";
    for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
      stream << ("files_L" + ToString(compaction->level(i)));
      stream.StartArray();
      for (auto f : *compaction->inputs(i)) {
        stream << f->fd.GetNumber();
      }
      stream.EndArray();
    }
    stream << "score" << compaction->score() << "input_data_size"
           << compaction->CalculateTotalInputSize();
  }

  const uint64_t start_micros = env_->NowMicros();
  std::unique_ptr<Iterator> input(
      versions_->MakeInputIterator(compact_->compaction));
  input->SeekToFirst();

  Status status;
  ParsedInternalKey ikey;
  std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2 =
      nullptr;
  auto context = compact_->GetFilterContext();
  compaction_filter_from_factory_v2 =
      cfd->ioptions()->compaction_filter_factory_v2->CreateCompactionFilterV2(
          context);
  auto compaction_filter_v2 = compaction_filter_from_factory_v2.get();

  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
  if (!compaction_filter_v2) {
    status = ProcessKeyValueCompaction(&imm_micros, input.get(), false);
  } else {
    // temp_backup_input always point to the start of the current buffer
    // temp_backup_input = backup_input;
    // iterate through input,
    // 1) buffer ineligible keys and value keys into 2 separate buffers;
    // 2) send value_buffer to compaction filter and alternate the values;
    // 3) merge value_buffer with ineligible_value_buffer;
    // 4) run the modified "compaction" using the old for loop.
    bool prefix_initialized = false;
    shared_ptr<Iterator> backup_input(
        versions_->MakeInputIterator(compact_->compaction));
    backup_input->SeekToFirst();
    uint64_t total_filter_time = 0;
    while (backup_input->Valid() &&
           !shutting_down_->load(std::memory_order_acquire) &&
           !cfd->IsDropped()) {
      // FLUSH preempts compaction
      // TODO(icanadi) this currently only checks if flush is necessary on
      // compacting column family. we should also check if flush is necessary on
      // other column families, too

      imm_micros += yield_callback_();

      Slice key = backup_input->key();
      Slice value = backup_input->value();

      if (!ParseInternalKey(key, &ikey)) {
        // log error
        Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
            "[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(),
            job_id_, key.ToString().c_str());
        continue;
      } else {
        const SliceTransform* transformer =
            cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor();
        const auto key_prefix = transformer->Transform(ikey.user_key);
        if (!prefix_initialized) {
          compact_->cur_prefix_ = key_prefix.ToString();
          prefix_initialized = true;
        }
        // If the prefix remains the same, keep buffering
        if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) {
          // Apply the compaction filter V2 to all the kv pairs sharing
          // the same prefix
          if (ikey.type == kTypeValue &&
              (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
            // Buffer all keys sharing the same prefix for CompactionFilterV2
            // Iterate through keys to check prefix
            compact_->BufferKeyValueSlices(key, value);
          } else {
            // buffer ineligible keys
            compact_->BufferOtherKeyValueSlices(key, value);
          }
          backup_input->Next();
          continue;
          // finish changing values for eligible keys
        } else {
          // Now prefix changes, this batch is done.
          // Call compaction filter on the buffered values to change the value
          if (compact_->key_str_buf_.size() > 0) {
            uint64_t time = 0;
            CallCompactionFilterV2(compaction_filter_v2, &time);
            total_filter_time += time;
          }
          compact_->cur_prefix_ = key_prefix.ToString();
        }
      }

      // Merge this batch of data (values + ineligible keys)
      compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());

      // Done buffering for the current prefix. Spit it out to disk
      // Now just iterate through all the kv-pairs
      status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);

      if (!status.ok()) {
        break;
      }

      // After writing the kv-pairs, we can safely remove the reference
      // to the string buffer and clean them up
      compact_->CleanupBatchBuffer();
      compact_->CleanupMergedBuffer();
      // Buffer the key that triggers the mismatch in prefix
      if (ikey.type == kTypeValue &&
          (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
        compact_->BufferKeyValueSlices(key, value);
      } else {
        compact_->BufferOtherKeyValueSlices(key, value);
      }
      backup_input->Next();
      if (!backup_input->Valid()) {
        // If this is the single last value, we need to merge it.
        if (compact_->key_str_buf_.size() > 0) {
          uint64_t time = 0;
          CallCompactionFilterV2(compaction_filter_v2, &time);
          total_filter_time += time;
        }
        compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());

        status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
        if (!status.ok()) {
          break;
        }

        compact_->CleanupBatchBuffer();
        compact_->CleanupMergedBuffer();
      }
    }  // done processing all prefix batches
    // finish the last batch
    if (status.ok()) {
      if (compact_->key_str_buf_.size() > 0) {
        uint64_t time = 0;
        CallCompactionFilterV2(compaction_filter_v2, &time);
        total_filter_time += time;
      }
      compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
      status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
    }
    RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
  }  // checking for compaction filter v2

  if (status.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
    status = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during compaction");
  }
  if (status.ok() && compact_->builder != nullptr) {
    status = FinishCompactionOutputFile(input.get());
  }
  if (status.ok()) {
    status = input->status();
  }
  input.reset();

  if (output_directory_ && !db_options_.disableDataSync) {
    output_directory_->Fsync();
  }

  compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
  compaction_stats_.files_in_leveln =
      static_cast<int>(compact_->compaction->num_input_files(0));
  compaction_stats_.files_in_levelnp1 =
      static_cast<int>(compact_->compaction->num_input_files(1));
  MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);

  size_t num_output_files = compact_->outputs.size();
  if (compact_->builder != nullptr) {
    // An error occurred so ignore the last output.
    assert(num_output_files > 0);
    --num_output_files;
  }
  compaction_stats_.files_out_levelnp1 = static_cast<int>(num_output_files);

  for (size_t i = 0; i < compact_->compaction->num_input_files(0); i++) {
    compaction_stats_.bytes_readn +=
        compact_->compaction->input(0, i)->fd.GetFileSize();
    compaction_stats_.num_input_records +=
        static_cast<uint64_t>(compact_->compaction->input(0, i)->num_entries);
  }

  for (size_t i = 0; i < compact_->compaction->num_input_files(1); i++) {
    compaction_stats_.bytes_readnp1 +=
        compact_->compaction->input(1, i)->fd.GetFileSize();
  }

  for (size_t i = 0; i < num_output_files; i++) {
    compaction_stats_.bytes_written += compact_->outputs[i].file_size;
  }
  if (compact_->num_input_records > compact_->num_output_records) {
    compaction_stats_.num_dropped_records +=
        compact_->num_input_records - compact_->num_output_records;
  }

  RecordCompactionIOStats();

  LogFlush(db_options_.info_log);
  TEST_SYNC_POINT("CompactionJob::Run():End");
  return status;
}

void CompactionJob::Install(Status* status,
                            const MutableCFOptions& mutable_cf_options,
                            InstrumentedMutex* db_mutex) {
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_INSTALL);
  db_mutex->AssertHeld();
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  cfd->internal_stats()->AddCompactionStats(
      compact_->compaction->output_level(), compaction_stats_);

  if (status->ok()) {
    *status = InstallCompactionResults(db_mutex, mutable_cf_options);
  }
  VersionStorageInfo::LevelSummaryStorage tmp;
  auto vstorage = cfd->current()->storage_info();
  const auto& stats = compaction_stats_;
  LogToBuffer(log_buffer_,
              "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
              "files in(%d, %d) out(%d) "
              "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
              "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
              cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
              (stats.bytes_readn + stats.bytes_readnp1) /
                  static_cast<double>(stats.micros),
              stats.bytes_written / static_cast<double>(stats.micros),
              compact_->compaction->output_level(), stats.files_in_leveln,
              stats.files_in_levelnp1, stats.files_out_levelnp1,
              stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
              stats.bytes_written / 1048576.0,
              (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
                  static_cast<double>(stats.bytes_readn),
              stats.bytes_written / static_cast<double>(stats.bytes_readn),
              status->ToString().c_str(), stats.num_input_records,
              stats.num_dropped_records);

  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_id_ << "event"
         << "compaction_finished"
         << "output_level" << compact_->compaction->output_level()
         << "num_output_files" << compact_->outputs.size()
         << "total_output_size" << compact_->total_bytes << "num_input_records"
         << compact_->num_input_records << "num_output_records"
         << compact_->num_output_records;
  stream << "lsm_state";
  stream.StartArray();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();

  CleanupCompaction(*status);
}

Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
                                                Iterator* input,
                                                bool is_compaction_v2) {
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
  size_t combined_idx = 0;
  Status status;
  std::string compaction_filter_value;
  ParsedInternalKey ikey;
  IterKey current_user_key;
  bool has_current_user_key = false;
  IterKey delete_key;
  SequenceNumber last_sequence_for_key __attribute__((unused)) =
      kMaxSequenceNumber;
  SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator,
                    db_options_.info_log.get(),
                    cfd->ioptions()->min_partial_merge_operands,
                    false /* internal key corruption is expected */);
  auto compaction_filter = cfd->ioptions()->compaction_filter;
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (!compaction_filter) {
    auto context = compact_->GetFilterContextV1();
    compaction_filter_from_factory =
        cfd->ioptions()->compaction_filter_factory->CreateCompactionFilter(
            context);
    compaction_filter = compaction_filter_from_factory.get();
  }

  TEST_SYNC_POINT("CompactionJob::Run():Inprogress");

  int64_t key_drop_user = 0;
  int64_t key_drop_newer_entry = 0;
  int64_t key_drop_obsolete = 0;
  int64_t loop_cnt = 0;

  StopWatchNano timer(env_, stats_ != nullptr);
  uint64_t total_filter_time = 0;
  while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
         !cfd->IsDropped() && status.ok()) {
    compact_->num_input_records++;
    if (++loop_cnt > 1000) {
      if (key_drop_user > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
        key_drop_user = 0;
      }
      if (key_drop_newer_entry > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
                   key_drop_newer_entry);
        key_drop_newer_entry = 0;
      }
      if (key_drop_obsolete > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
        key_drop_obsolete = 0;
      }
      RecordCompactionIOStats();
      loop_cnt = 0;
    }
    // FLUSH preempts compaction
    // TODO(icanadi) this currently only checks if flush is necessary on
    // compacting column family. we should also check if flush is necessary on
    // other column families, too
    (*imm_micros) += yield_callback_();

    Slice key;
    Slice value;
    // If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
    // This prefix batch should contain results after calling
    // compaction_filter_v2.
    //
    // If is_compaction_v2 is off, this function will go through all the
    // kv-pairs in input.
    if (!is_compaction_v2) {
      key = input->key();
      value = input->value();
    } else {
      if (combined_idx >= compact_->combined_key_buf_.size()) {
        break;
      }
      assert(combined_idx < compact_->combined_key_buf_.size());
      key = compact_->combined_key_buf_[combined_idx];
      value = compact_->combined_value_buf_[combined_idx];

      ++combined_idx;
    }

    if (compact_->compaction->ShouldStopBefore(key) &&
        compact_->builder != nullptr) {
      status = FinishCompactionOutputFile(input);
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
    bool drop = false;
    bool current_entry_is_merging = false;
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
      // TODO: error key stays in db forever? Figure out the intention/rationale
      // v10 error v8 : we cannot hide v8 even though it's pretty obvious.
      current_user_key.Clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
      visible_in_snapshot = kMaxSequenceNumber;
    } else {
      if (!has_current_user_key ||
          cfd->user_comparator()->Compare(ikey.user_key,
                                          current_user_key.GetKey()) != 0) {
        // First occurrence of this user key
        current_user_key.SetKey(ikey.user_key);
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
        visible_in_snapshot = kMaxSequenceNumber;
        // apply the compaction filter to the first occurrence of the user key
        if (compaction_filter && !is_compaction_v2 && ikey.type == kTypeValue &&
            (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
          // If the user has specified a compaction filter and the sequence
          // number is greater than any external snapshot, then invoke the
          // filter.
          // If the return value of the compaction filter is true, replace
          // the entry with a delete marker.
          bool value_changed = false;
          compaction_filter_value.clear();
          if (stats_ != nullptr) {
            timer.Start();
          }
          bool to_delete = compaction_filter->Filter(
              compact_->compaction->level(), ikey.user_key, value,
              &compaction_filter_value, &value_changed);
          total_filter_time += timer.ElapsedNanos();
          if (to_delete) {
            // make a copy of the original key and convert it to a delete
            delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
                                      kTypeDeletion);
            // anchor the key again
            key = delete_key.GetKey();
            // needed because ikey is backed by key
            ParseInternalKey(key, &ikey);
            // no value associated with delete
            value.clear();
            ++key_drop_user;
          } else if (value_changed) {
            value = compaction_filter_value;
          }
        }
      }

      // If there are no snapshots, then this kv affect visibility at tip.
      // Otherwise, search though all existing snapshots to find
      // the earlist snapshot that is affected by this kv.
      SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
      SequenceNumber visible =
          visible_at_tip_
              ? visible_at_tip_
              : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_,
                                            &prev_snapshot);

      if (visible_in_snapshot == visible) {
        // If the earliest snapshot is which this key is visible in
        // is the same as the visibily of a previous instance of the
        // same key, then this kv is not visible in any snapshot.
        // Hidden by an newer entry for same user key
        // TODO: why not > ?
        assert(last_sequence_for_key >= ikey.sequence);
        drop = true;  // (A)
        ++key_drop_newer_entry;
      } else if (ikey.type == kTypeDeletion &&
                 ikey.sequence <= earliest_snapshot_ &&
                 compact_->compaction->KeyNotExistsBeyondOutputLevel(
                     ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
        ++key_drop_obsolete;
      } else if (ikey.type == kTypeMerge) {
        if (!merge.HasOperator()) {
          LogToBuffer(log_buffer_, "Options::merge_operator is null.");
          status = Status::InvalidArgument(
              "merge_operator is not properly initialized.");
          break;
        }
        // We know the merge type entry is not hidden, otherwise we would
        // have hit (A)
        // We encapsulate the merge related state machine in a different
        // object to minimize change to the existing flow. Turn out this
        // logic could also be nicely re-used for memtable flush purge
        // optimization in BuildTable.
        int steps = 0;
        merge.MergeUntil(input, prev_snapshot, bottommost_level_,
                         db_options_.statistics.get(), &steps, env_);
        // Skip the Merge ops
        combined_idx = combined_idx - 1 + steps;

        current_entry_is_merging = true;
        if (merge.IsSuccess()) {
          // Successfully found Put/Delete/(end-of-key-range) while merging
          // Get the merge result
          key = merge.key();
          ParseInternalKey(key, &ikey);
          value = merge.value();
        } else {
          // Did not find a Put/Delete/(end-of-key-range) while merging
          // We now have some stack of merge operands to write out.
          // NOTE: key,value, and ikey are now referring to old entries.
          //       These will be correctly set below.
          assert(!merge.keys().empty());
          assert(merge.keys().size() == merge.values().size());

          // Hack to make sure last_sequence_for_key is correct
          ParseInternalKey(merge.keys().front(), &ikey);
        }
      }

      last_sequence_for_key = ikey.sequence;
      visible_in_snapshot = visible;
    }

    if (!drop) {
      // We may write a single key (e.g.: for Put/Delete or successful merge).
      // Or we may instead have to write a sequence/list of keys.
      // We have to write a sequence iff we have an unsuccessful merge
      bool has_merge_list = current_entry_is_merging && !merge.IsSuccess();
      const std::deque<std::string>* keys = nullptr;
      const std::deque<std::string>* values = nullptr;
      std::deque<std::string>::const_reverse_iterator key_iter;
      std::deque<std::string>::const_reverse_iterator value_iter;
      if (has_merge_list) {
        keys = &merge.keys();
        values = &merge.values();
        key_iter = keys->rbegin();  // The back (*rbegin()) is the first key
        value_iter = values->rbegin();

        key = Slice(*key_iter);
        value = Slice(*value_iter);
      }

      // If we have a list of keys to write, traverse the list.
      // If we have a single key to write, simply write that key.
      while (true) {
        // Invariant: key,value,ikey will always be the next entry to write
        char* kptr = (char*)key.data();
        std::string kstr;

        // Zeroing out the sequence number leads to better compression.
        // If this is the bottommost level (no files in lower levels)
        // and the earliest snapshot is larger than this seqno
        // then we can squash the seqno to zero.
        if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
            ikey.type != kTypeMerge) {
          assert(ikey.type != kTypeDeletion);
          // make a copy because updating in place would cause problems
          // with the priority queue that is managing the input key iterator
          kstr.assign(key.data(), key.size());
          kptr = (char*)kstr.c_str();
          UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
        }

        Slice newkey(kptr, key.size());
        assert((key.clear(), 1));  // we do not need 'key' anymore

        // Open output file if necessary
        if (compact_->builder == nullptr) {
          status = OpenCompactionOutputFile();
          if (!status.ok()) {
            break;
          }
        }

        SequenceNumber seqno = GetInternalKeySeqno(newkey);
        if (compact_->builder->NumEntries() == 0) {
          compact_->current_output()->smallest.DecodeFrom(newkey);
          compact_->current_output()->smallest_seqno = seqno;
        } else {
          compact_->current_output()->smallest_seqno =
              std::min(compact_->current_output()->smallest_seqno, seqno);
        }
        compact_->current_output()->largest.DecodeFrom(newkey);
        compact_->builder->Add(newkey, value);
        compact_->num_output_records++,
            compact_->current_output()->largest_seqno =
                std::max(compact_->current_output()->largest_seqno, seqno);

        // Close output file if it is big enough
        if (compact_->builder->FileSize() >=
            compact_->compaction->MaxOutputFileSize()) {
          status = FinishCompactionOutputFile(input);
          if (!status.ok()) {
            break;
          }
        }

        // If we have a list of entries, move to next element
        // If we only had one entry, then break the loop.
        if (has_merge_list) {
          ++key_iter;
          ++value_iter;

          // If at end of list
          if (key_iter == keys->rend() || value_iter == values->rend()) {
            // Sanity Check: if one ends, then both end
            assert(key_iter == keys->rend() && value_iter == values->rend());
            break;
          }

          // Otherwise not at end of list. Update key, value, and ikey.
          key = Slice(*key_iter);
          value = Slice(*value_iter);
          ParseInternalKey(key, &ikey);

        } else {
          // Only had one item to begin with (Put/Delete)
          break;
        }
      }  // while (true)
    }    // if (!drop)

    // MergeUntil has moved input to the next entry
    if (!current_entry_is_merging) {
      input->Next();
    }
  }
  RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
  if (key_drop_user > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
  }
  if (key_drop_newer_entry > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
  }
  if (key_drop_obsolete > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
  }
  RecordCompactionIOStats();

  return status;
}

void CompactionJob::CallCompactionFilterV2(
    CompactionFilterV2* compaction_filter_v2, uint64_t* time) {
  if (compact_ == nullptr || compaction_filter_v2 == nullptr) {
    return;
  }
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_FILTER_V2);

  // Assemble slice vectors for user keys and existing values.
  // We also keep track of our parsed internal key structs because
  // we may need to access the sequence number in the event that
  // keys are garbage collected during the filter process.
  std::vector<ParsedInternalKey> ikey_buf;
  std::vector<Slice> user_key_buf;
  std::vector<Slice> existing_value_buf;

  for (const auto& key : compact_->key_str_buf_) {
    ParsedInternalKey ikey;
    ParseInternalKey(Slice(key), &ikey);
    ikey_buf.emplace_back(ikey);
    user_key_buf.emplace_back(ikey.user_key);
  }
  for (const auto& value : compact_->existing_value_str_buf_) {
    existing_value_buf.emplace_back(Slice(value));
  }

  // If the user has specified a compaction filter and the sequence
  // number is greater than any external snapshot, then invoke the
  // filter.
  // If the return value of the compaction filter is true, replace
  // the entry with a delete marker.
  StopWatchNano timer(env_, stats_ != nullptr);
  compact_->to_delete_buf_ = compaction_filter_v2->Filter(
      compact_->compaction->level(), user_key_buf, existing_value_buf,
      &compact_->new_value_buf_, &compact_->value_changed_buf_);
  *time = timer.ElapsedNanos();
  // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
  // kv-pairs in this compaction run needs to be deleted.
  assert(compact_->to_delete_buf_.size() == compact_->key_str_buf_.size());
  assert(compact_->to_delete_buf_.size() ==
         compact_->existing_value_str_buf_.size());
  assert(compact_->value_changed_buf_.empty() ||
         compact_->to_delete_buf_.size() ==
         compact_->value_changed_buf_.size());

  int new_value_idx = 0;
  for (unsigned int i = 0; i < compact_->to_delete_buf_.size(); ++i) {
    if (compact_->to_delete_buf_[i]) {
      // update the string buffer directly
      // the Slice buffer points to the updated buffer
      UpdateInternalKey(&compact_->key_str_buf_[i][0],
                        compact_->key_str_buf_[i].size(), ikey_buf[i].sequence,
                        kTypeDeletion);

      // no value associated with delete
      compact_->existing_value_str_buf_[i].clear();
      RecordTick(stats_, COMPACTION_KEY_DROP_USER);
    } else if (!compact_->value_changed_buf_.empty() &&
        compact_->value_changed_buf_[i]) {
      compact_->existing_value_str_buf_[i] =
          compact_->new_value_buf_[new_value_idx++];
    }
  }  // for
}

Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
  assert(compact_ != nullptr);
  assert(compact_->outfile);
  assert(compact_->builder != nullptr);

  const uint64_t output_number = compact_->current_output()->number;
  const uint32_t output_path_id = compact_->current_output()->path_id;
  assert(output_number != 0);

  TableProperties table_properties;
  // Check for iterator errors
  Status s = input->status();
  const uint64_t current_entries = compact_->builder->NumEntries();
  if (s.ok()) {
    s = compact_->builder->Finish();
  } else {
    compact_->builder->Abandon();
  }
  if (s.ok()) {
    table_properties = compact_->builder->GetTableProperties();
  }
  const uint64_t current_bytes = compact_->builder->FileSize();
  compact_->current_output()->file_size = current_bytes;
  compact_->total_bytes += current_bytes;
  compact_->builder.reset();

  // Finish and check for file errors
  if (s.ok() && !db_options_.disableDataSync) {
    if (db_options_.use_fsync) {
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
      s = compact_->outfile->Fsync();
    } else {
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
      s = compact_->outfile->Sync();
    }
  }
  if (s.ok()) {
    s = compact_->outfile->Close();
  }
  compact_->outfile.reset();

  if (s.ok() && current_entries > 0) {
    // Verify that the table is usable
    ColumnFamilyData* cfd = compact_->compaction->column_family_data();
    FileDescriptor fd(output_number, output_path_id, current_bytes);
    Iterator* iter = cfd->table_cache()->NewIterator(
        ReadOptions(), env_options_, cfd->internal_comparator(), fd);
    s = iter->status();

    if (s.ok() && paranoid_file_checks_) {
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
      s = iter->status();
    }

    delete iter;
    if (s.ok()) {
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
          "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
          " keys, %" PRIu64 " bytes",
          cfd->GetName().c_str(), job_id_, output_number, current_entries,
          current_bytes);
      EventLoggerHelpers::LogTableFileCreation(event_logger_, job_id_,
                                               output_number, current_bytes,
                                               table_properties);
    }
  }
  return s;
}

Status CompactionJob::InstallCompactionResults(
    InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) {
  db_mutex->AssertHeld();

  auto* compaction = compact_->compaction;
  // paranoia: verify that the files that we started with
  // still exist in the current version and in the same original level.
  // This ensures that a concurrent compaction did not erroneously
  // pick the same files to compact_.
  if (!versions_->VerifyCompactionFileConsistency(compaction)) {
    Compaction::InputLevelSummaryBuffer inputs_summary;

    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] Compaction %s aborted",
        compaction->column_family_data()->GetName().c_str(), job_id_,
        compaction->InputLevelSummary(&inputs_summary));
    return Status::Corruption("Compaction input files inconsistent");
  }

  {
    Compaction::InputLevelSummaryBuffer inputs_summary;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
        compaction->column_family_data()->GetName().c_str(), job_id_,
        compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
  }

  // Add compaction outputs
  compaction->AddInputDeletions(compact_->compaction->edit());
  for (size_t i = 0; i < compact_->outputs.size(); i++) {
    const CompactionState::Output& out = compact_->outputs[i];
    compaction->edit()->AddFile(
        compaction->output_level(), out.number, out.path_id, out.file_size,
        out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
  }
  return versions_->LogAndApply(compaction->column_family_data(),
                                mutable_cf_options, compaction->edit(),
                                db_mutex, db_directory_);
}

// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot(
    SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
    SequenceNumber* prev_snapshot) {
  assert(snapshots.size());
  SequenceNumber prev __attribute__((unused)) = 0;
  for (const auto cur : snapshots) {
    assert(prev <= cur);
    if (cur >= in) {
      *prev_snapshot = prev;
      return cur;
    }
    prev = cur;  // assignment
    assert(prev);
  }
  Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
      "CompactionJob is not able to find snapshot"
      " with SeqId later than %" PRIu64
      ": current MaxSeqId is %" PRIu64 "",
      in, snapshots[snapshots.size() - 1]);
  assert(0);
  return 0;
}

void CompactionJob::RecordCompactionIOStats() {
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
  IOSTATS_RESET(bytes_read);
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
  IOSTATS_RESET(bytes_written);
}

Status CompactionJob::OpenCompactionOutputFile() {
  assert(compact_ != nullptr);
  assert(compact_->builder == nullptr);
  // no need to lock because VersionSet::next_file_number_ is atomic
  uint64_t file_number = versions_->NewFileNumber();
  // Make the output file
  std::string fname = TableFileName(db_options_.db_paths, file_number,
                                    compact_->compaction->GetOutputPathId());
  Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_);

  if (!s.ok()) {
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
        " fails at NewWritableFile with status %s",
        compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
        file_number, s.ToString().c_str());
    LogFlush(db_options_.info_log);
    return s;
  }
  CompactionState::Output out;
  out.number = file_number;
  out.path_id = compact_->compaction->GetOutputPathId();
  out.smallest.Clear();
  out.largest.Clear();
  out.smallest_seqno = out.largest_seqno = 0;

  compact_->outputs.push_back(out);
  compact_->outfile->SetIOPriority(Env::IO_LOW);
  compact_->outfile->SetPreallocationBlockSize(
      static_cast<size_t>(compact_->compaction->OutputFilePreallocationSize()));

  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  bool skip_filters = false;

  // If the Column family flag is to only optimize filters for hits,
  // we can skip creating filters if this is the bottommost_level where
  // data is going to be found
  //
  if (cfd->ioptions()->optimize_filters_for_hits && bottommost_level_) {
    skip_filters = true;
  }

  compact_->builder.reset(NewTableBuilder(
      *cfd->ioptions(), cfd->internal_comparator(),
      cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(),
      compact_->compaction->OutputCompressionType(),
      cfd->ioptions()->compression_opts, skip_filters));
  LogFlush(db_options_.info_log);
  return s;
}

void CompactionJob::CleanupCompaction(const Status& status) {
  if (compact_->builder != nullptr) {
    // May happen if we get a shutdown call in the middle of compaction
    compact_->builder->Abandon();
    compact_->builder.reset();
  } else {
    assert(!status.ok() || compact_->outfile == nullptr);
  }
  for (size_t i = 0; i < compact_->outputs.size(); i++) {
    const CompactionState::Output& out = compact_->outputs[i];

    // If this file was inserted into the table cache then remove
    // them here because this compaction was not committed.
    if (!status.ok()) {
      TableCache::Evict(table_cache_.get(), out.number);
    }
  }
  delete compact_;
  compact_ = nullptr;
}

}  // namespace rocksdb
back to top