Revision 3df07d1703297cf8bf98327c1dc65f48d18957f5 authored by sdong on 19 May 2014, 17:40:18 UTC, committed by sdong on 19 May 2014, 18:52:12 UTC
Summary:
Add a feature to decrease the number of threads in thread pool.
Also instantly schedule more threads if number of threads is increased.

Here is the way it is implemented: each background thread needs its thread ID. After decreasing number of threads, all threads are woken up. The thread with the largest thread ID will terminate. If there are more threads to terminate, the thread will wake up all threads again.

Another change is made so that when number of threads is increased, more threads are created and all previous excessive threads are woken up to do the work.

Test Plan: Add a unit test.

Reviewers: haobo, dhruba

Reviewed By: haobo

CC: yhchiang, igor, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D18675
1 parent 1e56045
Raw File
log_and_apply_bench.cc
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.


#include <vector>

#include "util/testharness.h"
#include "util/benchharness.h"
#include "db/version_set.h"
#include "util/mutexlock.h"

namespace rocksdb {

std::string MakeKey(unsigned int num) {
  char buf[30];
  snprintf(buf, sizeof(buf), "%016u", num);
  return std::string(buf);
}

void BM_LogAndApply(int iters, int num_base_files) {
  VersionSet* vset;
  ColumnFamilyData* default_cfd;
  uint64_t fnum = 1;
  port::Mutex mu;
  MutexLock l(&mu);

  BENCHMARK_SUSPEND {
    std::string dbname = test::TmpDir() + "/rocksdb_test_benchmark";
    ASSERT_OK(DestroyDB(dbname, Options()));

    DB* db = nullptr;
    Options opts;
    opts.create_if_missing = true;
    Status s = DB::Open(opts, dbname, &db);
    ASSERT_OK(s);
    ASSERT_TRUE(db != nullptr);

    delete db;
    db = nullptr;

    Options options;
    EnvOptions sopt;
    vset = new VersionSet(dbname, &options, sopt, nullptr);
    std::vector<ColumnFamilyDescriptor> dummy;
    dummy.push_back(ColumnFamilyDescriptor());
    ASSERT_OK(vset->Recover(dummy));
    default_cfd = vset->GetColumnFamilySet()->GetDefault();
    VersionEdit vbase;
    for (int i = 0; i < num_base_files; i++) {
      InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
      InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
      vbase.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1);
    }
    ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu));
  }

  for (int i = 0; i < iters; i++) {
    VersionEdit vedit;
    vedit.DeleteFile(2, fnum);
    InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
    InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
    vedit.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1);
    vset->LogAndApply(default_cfd, &vedit, &mu);
  }
}

BENCHMARK_NAMED_PARAM(BM_LogAndApply, 1000_iters_1_file, 1000, 1)
BENCHMARK_NAMED_PARAM(BM_LogAndApply, 1000_iters_100_files, 1000, 100)
BENCHMARK_NAMED_PARAM(BM_LogAndApply, 1000_iters_10000_files, 1000, 10000)
BENCHMARK_NAMED_PARAM(BM_LogAndApply, 100_iters_100000_files, 100, 100000)

}  // namespace rocksdb

int main(int argc, char** argv) {
  rocksdb::benchmark::RunBenchmarks();
  return 0;
}
back to top