https://github.com/facebook/rocksdb
Raw File
Tip revision: 9b50106f9ae1163c2da55b479bc01035964ac392 authored by Mayank Agarwal on 18 October 2013, 21:50:54 UTC
Dbid feature
Tip revision: 9b50106
memtable.cc
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/memtable.h"

#include <memory>

#include "db/dbformat.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "util/coding.h"
#include "util/murmurhash.h"

namespace rocksdb {

MemTable::MemTable(const InternalKeyComparator& cmp,
                   std::shared_ptr<MemTableRepFactory> table_factory,
                   int numlevel,
                   const Options& options)
    : comparator_(cmp),
      refs_(0),
      arena_impl_(options.arena_block_size),
      table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)),
      flush_in_progress_(false),
      flush_completed_(false),
      file_number_(0),
      edit_(numlevel),
      first_seqno_(0),
      mem_next_logfile_number_(0),
      mem_logfile_number_(0) { }

MemTable::~MemTable() {
  assert(refs_ == 0);
}

size_t MemTable::ApproximateMemoryUsage() {
  return arena_impl_.ApproximateMemoryUsage() +
    table_->ApproximateMemoryUsage();
}

int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
    const {
  // Internal keys are encoded as length-prefixed strings.
  Slice a = GetLengthPrefixedSlice(aptr);
  Slice b = GetLengthPrefixedSlice(bptr);
  return comparator.Compare(a, b);
}

Slice MemTableRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
  scratch->clear();
  PutVarint32(scratch, target.size());
  scratch->append(target.data(), target.size());
  return scratch->data();
}

class MemTableIterator: public Iterator {
 public:
  explicit MemTableIterator(MemTableRep* table)
    : iter_(table->GetIterator()) { }

  MemTableIterator(MemTableRep* table, const Slice* prefix)
    : iter_(table->GetPrefixIterator(*prefix)) { }

  virtual bool Valid() const { return iter_->Valid(); }
  virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
  virtual void SeekToFirst() { iter_->SeekToFirst(); }
  virtual void SeekToLast() { iter_->SeekToLast(); }
  virtual void Next() { iter_->Next(); }
  virtual void Prev() { iter_->Prev(); }
  virtual Slice key() const {
    return GetLengthPrefixedSlice(iter_->key());
  }
  virtual Slice value() const {
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  }

  virtual Status status() const { return Status::OK(); }

 private:
  std::shared_ptr<MemTableRep::Iterator> iter_;
  std::string tmp_;       // For passing to EncodeKey

  // No copying allowed
  MemTableIterator(const MemTableIterator&);
  void operator=(const MemTableIterator&);
};

Iterator* MemTable::NewIterator(const Slice* prefix) {
  if (prefix) {
    return new MemTableIterator(table_.get(), prefix);
  } else {
    return new MemTableIterator(table_.get());
  }
}

void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key,
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();
  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len =
      VarintLength(internal_key_size) + internal_key_size +
      VarintLength(val_size) + val_size;
  char* buf = arena_impl_.Allocate(encoded_len);
  char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
  assert((p + val_size) - buf == (unsigned)encoded_len);
  table_->Insert(buf);

  // The first sequence number inserted into the memtable
  assert(first_seqno_ == 0 || s > first_seqno_);
  if (first_seqno_ == 0) {
    first_seqno_ = s;
  }
}

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
                   std::deque<std::string>* operands, const Options& options) {
  Slice memkey = key.memtable_key();
  std::shared_ptr<MemTableRep::Iterator> iter(
    table_->GetIterator(key.user_key()));
  iter->Seek(memkey.data());

  // It is the caller's responsibility to allocate/delete operands list
  assert(operands != nullptr);

  bool merge_in_progress = s->IsMergeInProgress();
  auto merge_operator = options.merge_operator.get();
  auto logger = options.info_log;
  std::string merge_result;

  for (; iter->Valid(); iter->Next()) {
    // entry format is:
    //    klength  varint32
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
    uint32_t key_length;
    const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length - 8),
            key.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          *s = Status::OK();
          if (merge_in_progress) {
            assert(merge_operator);
            if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
                                           value, logger.get())) {
              RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
              *s = Status::Corruption("Error: Could not perform merge.");
            }
          } else {
            value->assign(v.data(), v.size());
          }
          return true;
        }
        case kTypeDeletion: {
          if (merge_in_progress) {
            assert(merge_operator);
            *s = Status::OK();
            if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
                                           value, logger.get())) {
              RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
              *s = Status::Corruption("Error: Could not perform merge.");
            }
          } else {
            *s = Status::NotFound(Slice());
          }
          return true;
        }
        case kTypeMerge: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          merge_in_progress = true;
          operands->push_front(v.ToString());
          while(operands->size() >= 2) {
            // Attempt to associative merge. (Returns true if successful)
            if (merge_operator->PartialMerge(key.user_key(),
                                             Slice((*operands)[0]),
                                             Slice((*operands)[1]),
                                             &merge_result,
                                             logger.get())) {
              operands->pop_front();
              swap(operands->front(), merge_result);
            } else {
              // Stack them because user can't associative merge
              break;
            }
          }
          break;
        }
        case kTypeLogData:
          assert(false);
          break;
      }
    } else {
      // exit loop if user key does not match
      break;
    }
  }

  // No change to value, since we have not yet found a Put/Delete

  if (merge_in_progress) {
    *s = Status::MergeInProgress("");
  }
  return false;
}

}  // namespace rocksdb
back to top