Revision 68d69899a152e09edfc012fd20a43a7b787a090e authored by Sagar Vemuri on 23 March 2018, 00:34:52 UTC, committed by Sagar Vemuri on 23 March 2018, 18:15:55 UTC
Summary:
Fsync after writing global sequence number to the ingestion file in ExternalSstFileIngestionJob. Otherwise the file metadata could be incorrect.
Closes https://github.com/facebook/rocksdb/pull/3644

Differential Revision: D7373813

Pulled By: sagar0

fbshipit-source-id: 4da2c9e71a8beb5c08b4ac955f288ee1576358b8
1 parent b4fc156
Raw File
write_buffer_manager.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "rocksdb/write_buffer_manager.h"
#include <mutex>
#include "util/coding.h"

namespace rocksdb {
#ifndef ROCKSDB_LITE
namespace {
const size_t kSizeDummyEntry = 1024 * 1024;
// The key will be longer than keys for blocks in SST files so they won't
// conflict.
const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
}  // namespace

struct WriteBufferManager::CacheRep {
  std::shared_ptr<Cache> cache_;
  std::mutex cache_mutex_;
  std::atomic<size_t> cache_allocated_size_;
  // The non-prefix part will be updated according to the ID to use.
  char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
  uint64_t next_cache_key_id_ = 0;
  std::vector<Cache::Handle*> dummy_handles_;

  explicit CacheRep(std::shared_ptr<Cache> cache)
      : cache_(cache), cache_allocated_size_(0) {
    memset(cache_key_, 0, kCacheKeyPrefix);
    size_t pointer_size = sizeof(const void*);
    assert(pointer_size <= kCacheKeyPrefix);
    memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
  }

  Slice GetNextCacheKey() {
    memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
    char* end =
        EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
    return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
  }
};
#else
struct WriteBufferManager::CacheRep {};
#endif  // ROCKSDB_LITE

WriteBufferManager::WriteBufferManager(size_t _buffer_size,
                                       std::shared_ptr<Cache> cache)
    : buffer_size_(_buffer_size),
      mutable_limit_(buffer_size_ * 7 / 8),
      memory_used_(0),
      memory_active_(0),
      cache_rep_(nullptr) {
#ifndef ROCKSDB_LITE
  if (cache) {
    // Construct the cache key using the pointer to this.
    cache_rep_.reset(new CacheRep(cache));
  }
#endif  // ROCKSDB_LITE
}

WriteBufferManager::~WriteBufferManager() {
#ifndef ROCKSDB_LITE
  if (cache_rep_) {
    for (auto* handle : cache_rep_->dummy_handles_) {
      cache_rep_->cache_->Release(handle, true);
    }
  }
#endif  // ROCKSDB_LITE
}

// Should only be called from write thread
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
  assert(cache_rep_ != nullptr);
  // Use a mutex to protect various data structures. Can be optimzied to a
  // lock-free solution if it ends up with a performance bottleneck.
  std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);

  size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
  memory_used_.store(new_mem_used, std::memory_order_relaxed);
  while (new_mem_used > cache_rep_->cache_allocated_size_) {
    // Expand size by at least 1MB.
    // Add a dummy record to the cache
    Cache::Handle* handle;
    cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
                               kSizeDummyEntry, nullptr, &handle);
    cache_rep_->dummy_handles_.push_back(handle);
    cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
  }
#endif  // ROCKSDB_LITE
}

void WriteBufferManager::FreeMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
  assert(cache_rep_ != nullptr);
  // Use a mutex to protect various data structures. Can be optimzied to a
  // lock-free solution if it ends up with a performance bottleneck.
  std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
  size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
  memory_used_.store(new_mem_used, std::memory_order_relaxed);
  // Gradually shrink memory costed in the block cache if the actual
  // usage is less than 3/4 of what we reserve from the block cache.
  // We do this becausse:
  // 1. we don't pay the cost of the block cache immediately a memtable is
  //    freed, as block cache insert is expensive;
  // 2. eventually, if we walk away from a temporary memtable size increase,
  //    we make sure shrink the memory costed in block cache over time.
  // In this way, we only shrink costed memory showly even there is enough
  // margin.
  if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
      cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
    assert(!cache_rep_->dummy_handles_.empty());
    cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
    cache_rep_->dummy_handles_.pop_back();
    cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
  }
#endif  // ROCKSDB_LITE
}
}  // namespace rocksdb
back to top