Revision f5cdf931a032a93688eb26532c4153b6950fd640 authored by Abhishek Kona on 13 November 2012, 00:45:45 UTC, committed by Abhishek Kona on 16 November 2012, 17:09:00 UTC
Summary:
Add option to read WAL and print a summary for each record.
facebook task => #1885013

E.G. Output :
./ldb dump_wal --walfile=/tmp/leveldbtest-5907/dbbench/026122.log --header
Sequence,Count,ByteSize
49981,1,100033
49981,1,100033
49982,1,100033
49981,1,100033
49982,1,100033
49983,1,100033
49981,1,100033
49982,1,100033
49983,1,100033
49984,1,100033
49981,1,100033
49982,1,100033

Test Plan:
Works run
./ldb read_wal --wal-file=/tmp/leveldbtest-5907/dbbench/000078.log --header

Reviewers: dhruba, heyongqiang

Reviewed By: dhruba

CC: emayanke, leveldb, zshao

Differential Revision: https://reviews.facebook.net/D6675
1 parent e988c11
Raw File
filename.cc
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <ctype.h>
#include <stdio.h>
#include "db/filename.h"
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "util/logging.h"
#include <iostream>

namespace leveldb {

// Given a path, flatten the path name by replacing all chars not in
// {[0-9,a-z,A-Z,-,_,.]} with _. And append '\0' at the end.
// Return the number of chars stored in dest not including the trailing '\0'.
static int FlattenPath(const std::string& path, char* dest, int len) {
  int write_idx = 0;
  int i = 0;
  int src_len = path.size();

  while (i < src_len && write_idx < len - 1) {
    if ((path[i] >= 'a' && path[i] <= 'z') ||
        (path[i] >= '0' && path[i] <= '9') ||
        (path[i] >= 'A' && path[i] <= 'Z') ||
        path[i] == '-' ||
        path[i] == '.' ||
        path[i] == '_'){
      dest[write_idx++] = path[i];
    } else {
      if (i > 0)
        dest[write_idx++] = '_';
    }
    i++;
  }

  dest[write_idx] = '\0';
  return write_idx;
}

// A utility routine: write "data" to the named file and Sync() it.
extern Status WriteStringToFileSync(Env* env, const Slice& data,
                                    const std::string& fname);

static std::string MakeFileName(const std::string& name, uint64_t number,
                                const char* suffix) {
  char buf[100];
  snprintf(buf, sizeof(buf), "/%06llu.%s",
           static_cast<unsigned long long>(number),
           suffix);
  return name + buf;
}

std::string LogFileName(const std::string& name, uint64_t number) {
  assert(number > 0);
  return MakeFileName(name, number, "log");
}

std::string TableFileName(const std::string& name, uint64_t number) {
  assert(number > 0);
  return MakeFileName(name, number, "sst");
}

std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
  assert(number > 0);
  char buf[100];
  snprintf(buf, sizeof(buf), "/MANIFEST-%06llu",
           static_cast<unsigned long long>(number));
  return dbname + buf;
}

std::string CurrentFileName(const std::string& dbname) {
  return dbname + "/CURRENT";
}

std::string LockFileName(const std::string& dbname) {
  return dbname + "/LOCK";
}

std::string TempFileName(const std::string& dbname, uint64_t number) {
  assert(number > 0);
  return MakeFileName(dbname, number, "dbtmp");
}

std::string InfoLogFileName(const std::string& dbname,
    const std::string& db_path, const std::string& log_dir) {
  if (log_dir.empty())
    return dbname + "/LOG";

  char flatten_db_path[256];
  FlattenPath(db_path, flatten_db_path, 256);
  return log_dir + "/" + flatten_db_path + "_LOG";
}

// Return the name of the old info log file for "dbname".
std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts,
    const std::string& db_path, const std::string& log_dir) {
  char buf[50];
  snprintf(buf, sizeof(buf), "%llu", static_cast<unsigned long long>(ts));

  if (log_dir.empty())
    return dbname + "/LOG.old." + buf;

  char flatten_db_path[256];
  FlattenPath(db_path, flatten_db_path, 256);
  return log_dir + "/" + flatten_db_path + "_LOG.old." + buf;
}


// Owned filenames have the form:
//    dbname/CURRENT
//    dbname/LOCK
//    dbname/LOG
//    dbname/LOG.old.[0-9]+
//    dbname/MANIFEST-[0-9]+
//    dbname/[0-9]+.(log|sst)
bool ParseFileName(const std::string& fname,
                   uint64_t* number,
                   FileType* type) {
  Slice rest(fname);
  if (rest == "CURRENT") {
    *number = 0;
    *type = kCurrentFile;
  } else if (rest == "LOCK") {
    *number = 0;
    *type = kDBLockFile;
  } else if (rest == "LOG" || rest == "LOG.old") {
    *number = 0;
    *type = kInfoLogFile;
  } else if (rest.starts_with("LOG.old.")) {
    uint64_t ts_suffix;
    rest.remove_prefix(sizeof("LOG.old."));
    if (!ConsumeDecimalNumber(&rest, &ts_suffix)) {
      return false;
    }
    *number = ts_suffix;
    *type = kInfoLogFile;
  } else if (rest.starts_with("MANIFEST-")) {
    rest.remove_prefix(strlen("MANIFEST-"));
    uint64_t num;
    if (!ConsumeDecimalNumber(&rest, &num)) {
      return false;
    }
    if (!rest.empty()) {
      return false;
    }
    *type = kDescriptorFile;
    *number = num;
  } else {
    // Avoid strtoull() to keep filename format independent of the
    // current locale
    uint64_t num;
    if (!ConsumeDecimalNumber(&rest, &num)) {
      return false;
    }
    Slice suffix = rest;
    if (suffix == Slice(".log")) {
      *type = kLogFile;
    } else if (suffix == Slice(".sst")) {
      *type = kTableFile;
    } else if (suffix == Slice(".dbtmp")) {
      *type = kTempFile;
    } else {
      return false;
    }
    *number = num;
  }
  return true;
}

Status SetCurrentFile(Env* env, const std::string& dbname,
                      uint64_t descriptor_number) {
  // Remove leading "dbname/" and add newline to manifest file name
  std::string manifest = DescriptorFileName(dbname, descriptor_number);
  Slice contents = manifest;
  assert(contents.starts_with(dbname + "/"));
  contents.remove_prefix(dbname.size() + 1);
  std::string tmp = TempFileName(dbname, descriptor_number);
  Status s = WriteStringToFileSync(env, contents.ToString() + "\n", tmp);
  if (s.ok()) {
    s = env->RenameFile(tmp, CurrentFileName(dbname));
  }
  if (!s.ok()) {
    env->DeleteFile(tmp);
  }
  return s;
}

}  // namespace leveldb
back to top