https://github.com/facebook/rocksdb
Raw File
Tip revision: f4441966592636253fd5ab0bb9ed44fc2697fc53 authored by anand76 on 11 March 2024, 18:26:24 UTC
Add a FS flag to detect and correct corruption (#12408)
Tip revision: f444196
merge_helper.h
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
#pragma once

#include <deque>
#include <string>
#include <vector>

#include "db/merge_context.h"
#include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "db/wide/wide_column_serialization.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "rocksdb/wide_columns.h"
#include "util/stop_watch.h"

namespace ROCKSDB_NAMESPACE {

class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class Statistics;
class SystemClock;
class BlobFetcher;
class PrefetchBufferCollection;
struct CompactionIterationStats;

class MergeHelper {
 public:
  MergeHelper(Env* env, const Comparator* user_comparator,
              const MergeOperator* user_merge_operator,
              const CompactionFilter* compaction_filter, Logger* logger,
              bool assert_valid_internal_key, SequenceNumber latest_snapshot,
              const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
              Statistics* stats = nullptr,
              const std::atomic<bool>* shutting_down = nullptr);

  // Wrappers around MergeOperator::FullMergeV3() that record perf statistics.
  // Set `update_num_ops_stats` to true if it is from a user read so that
  // the corresponding statistics are updated.
  // Returns one of the following statuses:
  // - OK: Entries were successfully merged.
  // - Corruption: Merge operator reported unsuccessful merge. The scope of the
  //   damage will be stored in `*op_failure_scope` when `op_failure_scope` is
  //   not nullptr

  // Empty tag types to disambiguate overloads
  struct NoBaseValueTag {};
  static constexpr NoBaseValueTag kNoBaseValue{};

  struct PlainBaseValueTag {};
  static constexpr PlainBaseValueTag kPlainBaseValue{};

  struct WideBaseValueTag {};
  static constexpr WideBaseValueTag kWideBaseValue{};

  template <typename... ResultTs>
  static Status TimedFullMerge(const MergeOperator* merge_operator,
                               const Slice& key, NoBaseValueTag,
                               const std::vector<Slice>& operands,
                               Logger* logger, Statistics* statistics,
                               SystemClock* clock, bool update_num_ops_stats,
                               MergeOperator::OpFailureScope* op_failure_scope,
                               ResultTs... results) {
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value;

    return TimedFullMergeImpl(
        merge_operator, key, std::move(existing_value), operands, logger,
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  }

  template <typename... ResultTs>
  static Status TimedFullMerge(
      const MergeOperator* merge_operator, const Slice& key, PlainBaseValueTag,
      const Slice& value, const std::vector<Slice>& operands, Logger* logger,
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
      MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value(value);

    return TimedFullMergeImpl(
        merge_operator, key, std::move(existing_value), operands, logger,
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  }

  template <typename... ResultTs>
  static Status TimedFullMerge(
      const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
      const Slice& entity, const std::vector<Slice>& operands, Logger* logger,
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
      MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value;

    Slice entity_copy(entity);
    WideColumns existing_columns;

    const Status s =
        WideColumnSerialization::Deserialize(entity_copy, existing_columns);
    if (!s.ok()) {
      return s;
    }

    existing_value = std::move(existing_columns);

    return TimedFullMergeImpl(
        merge_operator, key, std::move(existing_value), operands, logger,
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  }

  template <typename... ResultTs>
  static Status TimedFullMerge(const MergeOperator* merge_operator,
                               const Slice& key, WideBaseValueTag,
                               const WideColumns& columns,
                               const std::vector<Slice>& operands,
                               Logger* logger, Statistics* statistics,
                               SystemClock* clock, bool update_num_ops_stats,
                               MergeOperator::OpFailureScope* op_failure_scope,
                               ResultTs... results) {
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);

    return TimedFullMergeImpl(
        merge_operator, key, std::move(existing_value), operands, logger,
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  }

  // During compaction, merge entries until we hit
  //     - a corrupted key
  //     - a Put/Delete,
  //     - a different user key,
  //     - a specific sequence number (snapshot boundary),
  //     - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
  //  or - the end of iteration
  //
  // The result(s) of the merge can be accessed in `MergeHelper::keys()` and
  // `MergeHelper::values()`, which are invalidated the next time `MergeUntil()`
  // is called. `MergeOutputIterator` is specially designed to iterate the
  // results of a `MergeHelper`'s most recent `MergeUntil()`.
  //
  // iter: (IN)  points to the first merge type entry
  //       (OUT) points to the first entry not included in the merge process
  // range_del_agg: (IN) filters merge operands covered by range tombstones.
  // stop_before: (IN) a sequence number that merge should not cross.
  //                   0 means no restriction
  // at_bottom:   (IN) true if the iterator covers the bottem level, which means
  //                   we could reach the start of the history of this user key.
  // allow_data_in_errors: (IN) if true, data details will be displayed in
  //                   error/log messages.
  // blob_fetcher: (IN) blob fetcher object for the compaction's input version.
  // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
  //                            used for compaction readahead.
  // c_iter_stats: (OUT) compaction iteration statistics.
  //
  // Returns one of the following statuses:
  // - OK: Entries were successfully merged.
  // - MergeInProgress: Output consists of merge operands only.
  // - Corruption: Merge operator reported unsuccessful merge or a corrupted
  //   key has been encountered and not expected (applies only when compiling
  //   with asserts removed).
  // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
  //
  // REQUIRED: The first key in the input is not corrupted.
  Status MergeUntil(InternalIterator* iter,
                    CompactionRangeDelAggregator* range_del_agg,
                    const SequenceNumber stop_before, const bool at_bottom,
                    const bool allow_data_in_errors,
                    const BlobFetcher* blob_fetcher,
                    const std::string* const full_history_ts_low,
                    PrefetchBufferCollection* prefetch_buffers,
                    CompactionIterationStats* c_iter_stats);

  // Filters a merge operand using the compaction filter specified
  // in the constructor. Returns the decision that the filter made.
  // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
  // optional outputs of compaction filter.
  // user_key includes timestamp if user-defined timestamp is enabled.
  CompactionFilter::Decision FilterMerge(const Slice& user_key,
                                         const Slice& value_slice);

  // Query the merge result
  // These are valid until the next MergeUntil call
  // If the merging was successful:
  //   - keys() contains a single element with the latest sequence number of
  //     the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
  //   - values() contains a single element with the result of merging all the
  //     operands together
  //
  //   IMPORTANT 1: the key type could change after the MergeUntil call.
  //        Put/Delete + Merge + ... + Merge => Put
  //        Merge + ... + Merge => Merge
  //
  // If the merge operator is not associative, and if a Put/Delete is not found
  // then the merging will be unsuccessful. In this case:
  //   - keys() contains the list of internal keys seen in order of iteration.
  //   - values() contains the list of values (merges) seen in the same order.
  //              values() is parallel to keys() so that the first entry in
  //              keys() is the key associated with the first entry in values()
  //              and so on. These lists will be the same length.
  //              All of these pairs will be merges over the same user key.
  //              See IMPORTANT 2 note below.
  //
  //   IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
  //                So keys().back() was the first key seen by iterator.
  // TODO: Re-style this comment to be like the first one
  const std::deque<std::string>& keys() const { return keys_; }
  const std::vector<Slice>& values() const {
    return merge_context_.GetOperands();
  }
  uint64_t TotalFilterTime() const { return total_filter_time_; }
  bool HasOperator() const { return user_merge_operator_ != nullptr; }

  // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
  // return true and fill *until with the key to which we should skip.
  // If true, keys() and values() are empty.
  bool FilteredUntil(Slice* skip_until) const {
    if (!has_compaction_filter_skip_until_) {
      return false;
    }
    assert(compaction_filter_ != nullptr);
    assert(skip_until != nullptr);
    assert(compaction_filter_skip_until_.Valid());
    *skip_until = compaction_filter_skip_until_.Encode();
    return true;
  }

 private:
  Env* env_;
  SystemClock* clock_;
  const Comparator* user_comparator_;
  const MergeOperator* user_merge_operator_;
  const CompactionFilter* compaction_filter_;
  const std::atomic<bool>* shutting_down_;
  Logger* logger_;
  bool assert_valid_internal_key_;  // enforce no internal key corruption?
  bool allow_single_operand_;
  SequenceNumber latest_snapshot_;
  const SnapshotChecker* const snapshot_checker_;
  int level_;

  // the scratch area that holds the result of MergeUntil
  // valid up to the next MergeUntil call

  // Keeps track of the sequence of keys seen
  std::deque<std::string> keys_;
  // Parallel with keys_; stores the operands
  mutable MergeContext merge_context_;

  StopWatchNano filter_timer_;
  uint64_t total_filter_time_;
  Statistics* stats_;

  bool has_compaction_filter_skip_until_ = false;
  std::string compaction_filter_value_;
  InternalKey compaction_filter_skip_until_;

  bool IsShuttingDown() {
    // This is a best-effort facility, so memory_order_relaxed is sufficient.
    return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
  }

  template <typename Visitor>
  static Status TimedFullMergeCommonImpl(
      const MergeOperator* merge_operator, const Slice& key,
      MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
      const std::vector<Slice>& operands, Logger* logger,
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
      MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor);

  // Variant that exposes the merge result directly (in serialized form for wide
  // columns) as well as its value type. Used by iterator and compaction.
  static Status TimedFullMergeImpl(
      const MergeOperator* merge_operator, const Slice& key,
      MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
      const std::vector<Slice>& operands, Logger* logger,
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
      MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
      Slice* result_operand, ValueType* result_type);

  // Variant that exposes the merge result translated into the form requested by
  // the client. (For example, if the result is a wide-column structure but the
  // client requested the results in plain-value form, the value of the default
  // column is returned.) Used by point lookups.
  static Status TimedFullMergeImpl(
      const MergeOperator* merge_operator, const Slice& key,
      MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
      const std::vector<Slice>& operands, Logger* logger,
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
      MergeOperator::OpFailureScope* op_failure_scope,
      std::string* result_value, PinnableWideColumns* result_entity);
};

// MergeOutputIterator can be used to iterate over the result of a merge.
class MergeOutputIterator {
 public:
  // The MergeOutputIterator is bound to a MergeHelper instance.
  explicit MergeOutputIterator(const MergeHelper* merge_helper);

  // Seeks to the first record in the output.
  void SeekToFirst();
  // Advances to the next record in the output.
  void Next();

  Slice key() { return Slice(*it_keys_); }
  Slice value() { return Slice(*it_values_); }
  bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }

 private:
  const MergeHelper* merge_helper_;
  std::deque<std::string>::const_reverse_iterator it_keys_;
  std::vector<Slice>::const_reverse_iterator it_values_;
};

}  // namespace ROCKSDB_NAMESPACE
back to top