https://github.com/facebook/rocksdb
Raw File
Tip revision: 0a501872e536f8630dda7f2567a15f8a9a12c669 authored by sdong on 31 October 2019, 23:39:02 UTC
Fix two odd assertions.
Tip revision: 0a50187
db_repl_stress.cc
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
#include <cstdio>

#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/types.h"
#include "port/atomic_pointer.h"
#include "util/testutil.h"


// Run a thread to perform Put's.
// Another thread uses GetUpdatesSince API to keep getting the updates.
// options :
// --num_inserts = the num of inserts the first thread should perform.
// --wal_ttl = the wal ttl for the run.

using namespace rocksdb;

struct DataPumpThread {
  size_t no_records;
  DB* db; // Assumption DB is Open'ed already.
};

static std::string RandomString(Random* rnd, int len) {
  std::string r;
  test::RandomString(rnd, len, &r);
  return r;
}

static void DataPumpThreadBody(void* arg) {
  DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
  DB* db = t->db;
  Random rnd(301);
  size_t i = 0;
  while(i++ < t->no_records) {
    if(!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
                Slice(RandomString(&rnd, 500))).ok()) {
      fprintf(stderr, "Error in put\n");
      exit(1);
    }
  }
}

struct ReplicationThread {
  port::AtomicPointer stop;
  DB* db;
  volatile size_t no_read;
};

static void ReplicationThreadBody(void* arg) {
  ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
  DB* db = t->db;
  unique_ptr<TransactionLogIterator> iter;
  SequenceNumber currentSeqNum = 1;
  while (t->stop.Acquire_Load() != nullptr) {
    iter.reset();
    Status s;
    while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
      if (t->stop.Acquire_Load() == nullptr) {
        return;
      }
    }
    fprintf(stderr, "Refreshing iterator\n");
    for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
      BatchResult res = iter->GetBatch();
      if (res.sequence != currentSeqNum) {
        fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum,
                        res.sequence);
        exit(1);
      }
    }
  }
}

int main(int argc, const char** argv) {

  uint64_t FLAGS_num_inserts = 1000;
  uint64_t FLAGS_WAL_ttl_seconds = 1000;
  char junk;
  long l;

  for (int i = 1; i < argc; ++i) {
    if (sscanf(argv[i], "--num_inserts=%ld%c", &l, &junk) == 1) {
      FLAGS_num_inserts = l;
    } else if (sscanf(argv[i], "--wal_ttl=%ld%c", &l, &junk) == 1) {
      FLAGS_WAL_ttl_seconds = l;
    } else {
      fprintf(stderr, "Invalid Flag '%s'\n", argv[i]);
      exit(1);
    }
  }

  Env* env = Env::Default();
  std::string default_db_path;
  env->GetTestDirectory(&default_db_path);
  default_db_path += "db_repl_stress";
  Options options;
  options.create_if_missing = true;
  options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
  DB* db;
  DestroyDB(default_db_path, options);

  Status s = DB::Open(options, default_db_path, &db);

  if (!s.ok()) {
    fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
    exit(1);
  }

  DataPumpThread dataPump;
  dataPump.no_records = FLAGS_num_inserts;
  dataPump.db = db;
  env->StartThread(DataPumpThreadBody, &dataPump);

  ReplicationThread replThread;
  replThread.db = db;
  replThread.no_read = 0;
  replThread.stop.Release_Store(env); // store something to make it non-null.

  env->StartThread(ReplicationThreadBody, &replThread);
  while(replThread.no_read < FLAGS_num_inserts);
  replThread.stop.Release_Store(nullptr);
  if (replThread.no_read < dataPump.no_records) {
    // no. read should be => than inserted.
    fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"
            " Written : %ld\n", replThread.no_read, dataPump.no_records);
    exit(1);
  }
  fprintf(stderr, "Successful!\n");
  exit(0);
}
back to top