Revision 005c34f0b81a5bfa806be823fd7440a33613fbcf authored by sdong on 18 June 2018, 16:54:06 UTC, committed by sdong on 18 June 2018, 16:55:01 UTC
1 parent 843738d
Raw File
transaction_log_impl.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include "db/transaction_log_impl.h"
#include <inttypes.h>
#include "db/write_batch_internal.h"
#include "util/file_reader_writer.h"

namespace rocksdb {

TransactionLogIteratorImpl::TransactionLogIteratorImpl(
    const std::string& dir, const ImmutableDBOptions* options,
    const TransactionLogIterator::ReadOptions& read_options,
    const EnvOptions& soptions, const SequenceNumber seq,
    std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
    const bool seq_per_batch)
    : dir_(dir),
      options_(options),
      read_options_(read_options),
      soptions_(soptions),
      startingSequenceNumber_(seq),
      files_(std::move(files)),
      started_(false),
      isValid_(false),
      currentFileIndex_(0),
      currentBatchSeq_(0),
      currentLastSeq_(0),
      versions_(versions),
      seq_per_batch_(seq_per_batch) {
  assert(files_ != nullptr);
  assert(versions_ != nullptr);

  reporter_.env = options_->env;
  reporter_.info_log = options_->info_log.get();
  SeekToStartSequence(); // Seek till starting sequence
}

Status TransactionLogIteratorImpl::OpenLogFile(
    const LogFile* logFile, unique_ptr<SequentialFileReader>* file_reader) {
  Env* env = options_->env;
  unique_ptr<SequentialFile> file;
  Status s;
  EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_);
  if (logFile->Type() == kArchivedLogFile) {
    std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
    s = env->NewSequentialFile(fname, &file, optimized_env_options);
  } else {
    std::string fname = LogFileName(dir_, logFile->LogNumber());
    s = env->NewSequentialFile(fname, &file, optimized_env_options);
    if (!s.ok()) {
      //  If cannot open file in DB directory.
      //  Try the archive dir, as it could have moved in the meanwhile.
      fname = ArchivedLogFileName(dir_, logFile->LogNumber());
      s = env->NewSequentialFile(fname, &file, optimized_env_options);
    }
  }
  if (s.ok()) {
    file_reader->reset(new SequentialFileReader(std::move(file)));
  }
  return s;
}

BatchResult TransactionLogIteratorImpl::GetBatch()  {
  assert(isValid_);  //  cannot call in a non valid state.
  BatchResult result;
  result.sequence = currentBatchSeq_;
  result.writeBatchPtr = std::move(currentBatch_);
  return result;
}

Status TransactionLogIteratorImpl::status() {
  return currentStatus_;
}

bool TransactionLogIteratorImpl::Valid() {
  return started_ && isValid_;
}

bool TransactionLogIteratorImpl::RestrictedRead(
    Slice* record,
    std::string* scratch) {
  // Don't read if no more complete entries to read from logs
  if (currentLastSeq_ >= versions_->LastSequence()) {
    return false;
  }
  return currentLogReader_->ReadRecord(record, scratch);
}

void TransactionLogIteratorImpl::SeekToStartSequence(
    uint64_t startFileIndex,
    bool strict) {
  std::string scratch;
  Slice record;
  started_ = false;
  isValid_ = false;
  if (files_->size() <= startFileIndex) {
    return;
  }
  Status s = OpenLogReader(files_->at(startFileIndex).get());
  if (!s.ok()) {
    currentStatus_ = s;
    reporter_.Info(currentStatus_.ToString().c_str());
    return;
  }
  while (RestrictedRead(&record, &scratch)) {
    if (record.size() < WriteBatchInternal::kHeader) {
      reporter_.Corruption(
        record.size(), Status::Corruption("very small log record"));
      continue;
    }
    UpdateCurrentWriteBatch(record);
    if (currentLastSeq_ >= startingSequenceNumber_) {
      if (strict && currentBatchSeq_ != startingSequenceNumber_) {
        currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
                                            "seek to required sequence number");
        reporter_.Info(currentStatus_.ToString().c_str());
        return;
      } else if (strict) {
        reporter_.Info("Could seek required sequence number. Iterator will "
                       "continue.");
      }
      isValid_ = true;
      started_ = true; // set started_ as we could seek till starting sequence
      return;
    } else {
      isValid_ = false;
    }
  }

  // Could not find start sequence in first file. Normally this must be the
  // only file. Otherwise log the error and let the iterator return next entry
  // If strict is set, we want to seek exactly till the start sequence and it
  // should have been present in the file we scanned above
  if (strict) {
    currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
                                        "seek to required sequence number");
    reporter_.Info(currentStatus_.ToString().c_str());
  } else if (files_->size() != 1) {
    currentStatus_ = Status::Corruption("Start sequence was not found, "
                                        "skipping to the next available");
    reporter_.Info(currentStatus_.ToString().c_str());
    // Let NextImpl find the next available entry. started_ remains false
    // because we don't want to check for gaps while moving to start sequence
    NextImpl(true);
  }
}

void TransactionLogIteratorImpl::Next() {
  return NextImpl(false);
}

void TransactionLogIteratorImpl::NextImpl(bool internal) {
  std::string scratch;
  Slice record;
  isValid_ = false;
  if (!internal && !started_) {
    // Runs every time until we can seek to the start sequence
    return SeekToStartSequence();
  }
  while(true) {
    assert(currentLogReader_);
    if (currentLogReader_->IsEOF()) {
      currentLogReader_->UnmarkEOF();
    }
    while (RestrictedRead(&record, &scratch)) {
      if (record.size() < WriteBatchInternal::kHeader) {
        reporter_.Corruption(
          record.size(), Status::Corruption("very small log record"));
        continue;
      } else {
        // started_ should be true if called by application
        assert(internal || started_);
        // started_ should be false if called internally
        assert(!internal || !started_);
        UpdateCurrentWriteBatch(record);
        if (internal && !started_) {
          started_ = true;
        }
        return;
      }
    }

    // Open the next file
    if (currentFileIndex_ < files_->size() - 1) {
      ++currentFileIndex_;
      Status s = OpenLogReader(files_->at(currentFileIndex_).get());
      if (!s.ok()) {
        isValid_ = false;
        currentStatus_ = s;
        return;
      }
    } else {
      isValid_ = false;
      if (currentLastSeq_ == versions_->LastSequence()) {
        currentStatus_ = Status::OK();
      } else {
        currentStatus_ = Status::Corruption("NO MORE DATA LEFT");
      }
      return;
    }
  }
}

bool TransactionLogIteratorImpl::IsBatchExpected(
    const WriteBatch* batch,
    const SequenceNumber expectedSeq) {
  assert(batch);
  SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
  if (batchSeq != expectedSeq) {
    char buf[200];
    snprintf(buf, sizeof(buf),
             "Discontinuity in log records. Got seq=%" PRIu64
             ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
             ".Log iterator will reseek the correct batch.",
             batchSeq, expectedSeq, versions_->LastSequence());
    reporter_.Info(buf);
    return false;
  }
  return true;
}

void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
  std::unique_ptr<WriteBatch> batch(new WriteBatch());
  WriteBatchInternal::SetContents(batch.get(), record);

  SequenceNumber expectedSeq = currentLastSeq_ + 1;
  // If the iterator has started, then confirm that we get continuous batches
  if (started_ && !IsBatchExpected(batch.get(), expectedSeq)) {
    // Seek to the batch having expected sequence number
    if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) {
      // Expected batch must lie in the previous log file
      // Avoid underflow.
      if (currentFileIndex_ != 0) {
        currentFileIndex_--;
      }
    }
    startingSequenceNumber_ = expectedSeq;
    // currentStatus_ will be set to Ok if reseek succeeds
    // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
    // that allows gaps in the WAL since it will still skip over the gap.
    currentStatus_ = Status::NotFound("Gap in sequence numbers");
    // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
    // should be disabled
    return SeekToStartSequence(currentFileIndex_, !seq_per_batch_);
  }

  struct BatchCounter : public WriteBatch::Handler {
    SequenceNumber sequence_;
    BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
    Status MarkNoop(bool empty_batch) override {
      if (!empty_batch) {
        sequence_++;
      }
      return Status::OK();
    }
    Status MarkEndPrepare(const Slice&) override {
      sequence_++;
      return Status::OK();
    }
    Status MarkCommit(const Slice&) override {
      sequence_++;
      return Status::OK();
    }

    Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
      return Status::OK();
    }
    Status DeleteCF(uint32_t cf, const Slice& key) override {
      return Status::OK();
    }
    Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
      return Status::OK();
    }
    Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
      return Status::OK();
    }
    Status MarkBeginPrepare() override { return Status::OK(); }
    Status MarkRollback(const Slice&) override { return Status::OK(); }
  };

  currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get());
  if (seq_per_batch_) {
    BatchCounter counter(currentBatchSeq_);
    batch->Iterate(&counter);
    currentLastSeq_ = counter.sequence_;
  } else {
    currentLastSeq_ =
        currentBatchSeq_ + WriteBatchInternal::Count(batch.get()) - 1;
  }
  // currentBatchSeq_ can only change here
  assert(currentLastSeq_ <= versions_->LastSequence());

  currentBatch_ = std::move(batch);
  isValid_ = true;
  currentStatus_ = Status::OK();
}

Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
  unique_ptr<SequentialFileReader> file;
  Status s = OpenLogFile(logFile, &file);
  if (!s.ok()) {
    return s;
  }
  assert(file);
  currentLogReader_.reset(new log::Reader(
      options_->info_log, std::move(file), &reporter_,
      read_options_.verify_checksums_, 0, logFile->LogNumber()));
  return Status::OK();
}
}  //  namespace rocksdb
#endif  // ROCKSDB_LITE
back to top