Revision 3df07d1703297cf8bf98327c1dc65f48d18957f5 authored by sdong on 19 May 2014, 17:40:18 UTC, committed by sdong on 19 May 2014, 18:52:12 UTC
Summary:
Add a feature to decrease the number of threads in thread pool.
Also instantly schedule more threads if number of threads is increased.

Here is the way it is implemented: each background thread needs its thread ID. After decreasing number of threads, all threads are woken up. The thread with the largest thread ID will terminate. If there are more threads to terminate, the thread will wake up all threads again.

Another change is made so that when number of threads is increased, more threads are created and all previous excessive threads are woken up to do the work.

Test Plan: Add a unit test.

Reviewers: haobo, dhruba

Reviewed By: haobo

CC: yhchiang, igor, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D18675
1 parent 1e56045
Raw File
merge_helper.h
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef MERGE_HELPER_H
#define MERGE_HELPER_H

#include "db/dbformat.h"
#include "rocksdb/slice.h"
#include <string>
#include <deque>

namespace rocksdb {

class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class Statistics;

class MergeHelper {
 public:
  MergeHelper(const Comparator* user_comparator,
              const MergeOperator* user_merge_operator, Logger* logger,
              unsigned min_partial_merge_operands,
              bool assert_valid_internal_key)
      : user_comparator_(user_comparator),
        user_merge_operator_(user_merge_operator),
        logger_(logger),
        min_partial_merge_operands_(min_partial_merge_operands),
        assert_valid_internal_key_(assert_valid_internal_key),
        keys_(),
        operands_(),
        success_(false) {}

  // Merge entries until we hit
  //     - a corrupted key
  //     - a Put/Delete,
  //     - a different user key,
  //     - a specific sequence number (snapshot boundary),
  //  or - the end of iteration
  // iter: (IN)  points to the first merge type entry
  //       (OUT) points to the first entry not included in the merge process
  // stop_before: (IN) a sequence number that merge should not cross.
  //                   0 means no restriction
  // at_bottom:   (IN) true if the iterator covers the bottem level, which means
  //                   we could reach the start of the history of this user key.
  void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
                  bool at_bottom = false, Statistics* stats = nullptr,
                  int* steps = nullptr);

  // Query the merge result
  // These are valid until the next MergeUntil call
  // If the merging was successful:
  //   - IsSuccess() will be true
  //   - key() will have the latest sequence number of the merges.
  //           The type will be Put or Merge. See IMPORTANT 1 note, below.
  //   - value() will be the result of merging all the operands together
  //   - The user should ignore keys() and values().
  //
  //   IMPORTANT 1: the key type could change after the MergeUntil call.
  //        Put/Delete + Merge + ... + Merge => Put
  //        Merge + ... + Merge => Merge
  //
  // If the merge operator is not associative, and if a Put/Delete is not found
  // then the merging will be unsuccessful. In this case:
  //   - IsSuccess() will be false
  //   - keys() contains the list of internal keys seen in order of iteration.
  //   - values() contains the list of values (merges) seen in the same order.
  //              values() is parallel to keys() so that the first entry in
  //              keys() is the key associated with the first entry in values()
  //              and so on. These lists will be the same length.
  //              All of these pairs will be merges over the same user key.
  //              See IMPORTANT 2 note below.
  //   - The user should ignore key() and value().
  //
  //   IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
  //                So keys().back() was the first key seen by iterator.
  // TODO: Re-style this comment to be like the first one
  bool IsSuccess() { return success_; }
  Slice key() { assert(success_); return Slice(keys_.back()); }
  Slice value() { assert(success_); return Slice(operands_.back()); }
  const std::deque<std::string>& keys() { assert(!success_); return keys_; }
  const std::deque<std::string>& values() {
    assert(!success_); return operands_;
  }

 private:
  const Comparator* user_comparator_;
  const MergeOperator* user_merge_operator_;
  Logger* logger_;
  unsigned min_partial_merge_operands_;
  bool assert_valid_internal_key_; // enforce no internal key corruption?

  // the scratch area that holds the result of MergeUntil
  // valid up to the next MergeUntil call
  std::deque<std::string> keys_;    // Keeps track of the sequence of keys seen
  std::deque<std::string> operands_;  // Parallel with keys_; stores the values
  bool success_;
};

} // namespace rocksdb

#endif
back to top