https://github.com/facebook/rocksdb
Raw File
Tip revision: 6e2b5809f660c2340bb4fd542995e82c75b0a4d4 authored by Mayank Agarwal on 12 September 2013, 04:58:07 UTC
Updating readme file for version 2.3
Tip revision: 6e2b580
transaction_log_impl.cc
#include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h"

namespace leveldb {

TransactionLogIteratorImpl::TransactionLogIteratorImpl(
                           const std::string& dbname,
                           const Options* options,
                           const EnvOptions& soptions,
                           const SequenceNumber seq,
                           std::unique_ptr<VectorLogPtr> files,
                           SequenceNumber const * const lastFlushedSequence) :
    dbname_(dbname),
    options_(options),
    soptions_(soptions),
    startingSequenceNumber_(seq),
    files_(std::move(files)),
    started_(false),
    isValid_(false),
    currentFileIndex_(0),
    lastFlushedSequence_(lastFlushedSequence) {
  assert(startingSequenceNumber_ <= *lastFlushedSequence_);
  assert(files_.get() != nullptr);

  reporter_.env = options_->env;
  reporter_.info_log = options_->info_log.get();
}

Status TransactionLogIteratorImpl::OpenLogFile(
  const LogFile* logFile,
  unique_ptr<SequentialFile>* file) {
  Env* env = options_->env;
  if (logFile->Type() == kArchivedLogFile) {
    std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
    return env->NewSequentialFile(fname, file, soptions_);
  } else {
    std::string fname = LogFileName(dbname_, logFile->LogNumber());
    Status status = env->NewSequentialFile(fname, file, soptions_);
    if (!status.ok()) {
      //  If cannot open file in DB directory.
      //  Try the archive dir, as it could have moved in the meanwhile.
      fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
      status = env->NewSequentialFile(fname, file, soptions_);
      if (!status.ok()) {
        return Status::IOError(" Requested file not present in the dir");
      }
    }
    return status;
  }
}

BatchResult TransactionLogIteratorImpl::GetBatch()  {
  assert(isValid_);  //  cannot call in a non valid state.
  BatchResult result;
  result.sequence = currentSequence_;
  result.writeBatchPtr = std::move(currentBatch_);
  return result;
}

Status TransactionLogIteratorImpl::status() {
  return currentStatus_;
}

bool TransactionLogIteratorImpl::Valid() {
  return started_ && isValid_;
}

void TransactionLogIteratorImpl::Next() {
  LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get();

//  First seek to the given seqNo. in the current file.
  std::string scratch;
  Slice record;
  if (!started_) {
    started_ = true;  // this piece only runs onced.
    isValid_ = false;
    if (startingSequenceNumber_ > *lastFlushedSequence_) {
      currentStatus_ = Status::IOError("Looking for a sequence, "
                                        "which is not flushed yet.");
      return;
    }
    Status s = OpenLogReader(currentLogFile);
    if (!s.ok()) {
      currentStatus_ = s;
      isValid_ = false;
      return;
    }
    while (currentLogReader_->ReadRecord(&record, &scratch)) {
      if (record.size() < 12) {
        reporter_.Corruption(
          record.size(), Status::Corruption("log record too small"));
        continue;
      }
      UpdateCurrentWriteBatch(record);
      if (currentSequence_ >= startingSequenceNumber_) {
        assert(currentSequence_ <= *lastFlushedSequence_);
        isValid_ = true;
        break;
      } else {
        isValid_ = false;
      }
    }
    if (isValid_) {
      // Done for this iteration
      return;
    }
  }
  bool openNextFile = true;
  while(openNextFile) {
    assert(currentLogReader_);
    if (currentSequence_ < *lastFlushedSequence_) {
      if (currentLogReader_->IsEOF()) {
        currentLogReader_->UnmarkEOF();
      }
      while (currentLogReader_->ReadRecord(&record, &scratch)) {
        if (record.size() < 12) {
          reporter_.Corruption(
            record.size(), Status::Corruption("log record too small"));
          continue;
        } else {
          UpdateCurrentWriteBatch(record);
          openNextFile = false;
          break;
        }
      }
    }

    if (openNextFile) {
      if (currentFileIndex_ < files_.get()->size() - 1) {
        ++currentFileIndex_;
        Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get());
        if (!status.ok()) {
          isValid_ = false;
          currentStatus_ = status;
          return;
        }
      } else {
        isValid_ = false;
        openNextFile = false;
        if (currentSequence_ == *lastFlushedSequence_) {
          currentStatus_ = Status::OK();
        } else {
          currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
        }
      }
    }
  }
}

void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
  WriteBatch* batch = new WriteBatch();
  WriteBatchInternal::SetContents(batch, record);
  currentSequence_ = WriteBatchInternal::Sequence(batch);
  currentBatch_.reset(batch);
  isValid_ = true;
  currentStatus_ = Status::OK();
}

Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
  unique_ptr<SequentialFile> file;
  Status status = OpenLogFile(logFile, &file);
  if (!status.ok()) {
    return status;
  }
  assert(file);
  currentLogReader_.reset(
    new log::Reader(std::move(file), &reporter_, true, 0)
  );
  return Status::OK();
}
}  //  namespace leveldb
back to top