Revision 6d58ea901d64626745b96cc9ac072b5c1c5c8be4 authored by Adam Retter on 27 November 2019, 00:55:46 UTC, committed by Facebook Github Bot on 27 November 2019, 02:24:09 UTC
Summary:
**NOTE**: this also needs to be back-ported to 6.4.6 and possibly older branches if further releases from them is envisaged.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6081

Differential Revision: D18710107

Pulled By: zhichao-cao

fbshipit-source-id: 03260f9316566e2bfc12c7d702d6338bb7941e01
1 parent 8ae149e
Raw File
readahead_raf.cc
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "file/readahead_raf.h"

#include <algorithm>
#include <mutex>
#include "file/read_write_util.h"
#include "util/aligned_buffer.h"
#include "util/rate_limiter.h"

namespace rocksdb {
namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile {
 public:
  ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
                            size_t readahead_size)
      : file_(std::move(file)),
        alignment_(file_->GetRequiredBufferAlignment()),
        readahead_size_(Roundup(readahead_size, alignment_)),
        buffer_(),
        buffer_offset_(0) {
    buffer_.Alignment(alignment_);
    buffer_.AllocateNewBuffer(readahead_size_);
  }

  ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;

  ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
      delete;

  Status Read(uint64_t offset, size_t n, Slice* result,
              char* scratch) const override {
    // Read-ahead only make sense if we have some slack left after reading
    if (n + alignment_ >= readahead_size_) {
      return file_->Read(offset, n, result, scratch);
    }

    std::unique_lock<std::mutex> lk(lock_);

    size_t cached_len = 0;
    // Check if there is a cache hit, meaning that [offset, offset + n) is
    // either completely or partially in the buffer. If it's completely cached,
    // including end of file case when offset + n is greater than EOF, then
    // return.
    if (TryReadFromCache(offset, n, &cached_len, scratch) &&
        (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
      // We read exactly what we needed, or we hit end of file - return.
      *result = Slice(scratch, cached_len);
      return Status::OK();
    }
    size_t advanced_offset = static_cast<size_t>(offset + cached_len);
    // In the case of cache hit advanced_offset is already aligned, means that
    // chunk_offset equals to advanced_offset
    size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);

    Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
    if (s.ok()) {
      // The data we need is now in cache, so we can safely read it
      size_t remaining_len;
      TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
                       scratch + cached_len);
      *result = Slice(scratch, cached_len + remaining_len);
    }
    return s;
  }

  Status Prefetch(uint64_t offset, size_t n) override {
    if (n < readahead_size_) {
      // Don't allow smaller prefetches than the configured `readahead_size_`.
      // `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
      return Status::OK();
    }

    std::unique_lock<std::mutex> lk(lock_);

    size_t offset_ = static_cast<size_t>(offset);
    size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
    if (prefetch_offset == buffer_offset_) {
      return Status::OK();
    }
    return ReadIntoBuffer(prefetch_offset,
                          Roundup(offset_ + n, alignment_) - prefetch_offset);
  }

  size_t GetUniqueId(char* id, size_t max_size) const override {
    return file_->GetUniqueId(id, max_size);
  }

  void Hint(AccessPattern pattern) override { file_->Hint(pattern); }

  Status InvalidateCache(size_t offset, size_t length) override {
    std::unique_lock<std::mutex> lk(lock_);
    buffer_.Clear();
    return file_->InvalidateCache(offset, length);
  }

  bool use_direct_io() const override { return file_->use_direct_io(); }

 private:
  // Tries to read from buffer_ n bytes starting at offset. If anything was read
  // from the cache, it sets cached_len to the number of bytes actually read,
  // copies these number of bytes to scratch and returns true.
  // If nothing was read sets cached_len to 0 and returns false.
  bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
                        char* scratch) const {
    if (offset < buffer_offset_ ||
        offset >= buffer_offset_ + buffer_.CurrentSize()) {
      *cached_len = 0;
      return false;
    }
    uint64_t offset_in_buffer = offset - buffer_offset_;
    *cached_len = std::min(
        buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
    memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
    return true;
  }

  // Reads into buffer_ the next n bytes from file_ starting at offset.
  // Can actually read less if EOF was reached.
  // Returns the status of the read operastion on the file.
  Status ReadIntoBuffer(uint64_t offset, size_t n) const {
    if (n > buffer_.Capacity()) {
      n = buffer_.Capacity();
    }
    assert(IsFileSectorAligned(offset, alignment_));
    assert(IsFileSectorAligned(n, alignment_));
    Slice result;
    Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
    if (s.ok()) {
      buffer_offset_ = offset;
      buffer_.Size(result.size());
      assert(result.size() == 0 || buffer_.BufferStart() == result.data());
    }
    return s;
  }

  const std::unique_ptr<RandomAccessFile> file_;
  const size_t alignment_;
  const size_t readahead_size_;

  mutable std::mutex lock_;
  // The buffer storing the prefetched data
  mutable AlignedBuffer buffer_;
  // The offset in file_, corresponding to data stored in buffer_
  mutable uint64_t buffer_offset_;
};
}  // namespace

std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
    std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
  std::unique_ptr<RandomAccessFile> result(
      new ReadaheadRandomAccessFile(std::move(file), readahead_size));
  return result;
}
}  // namespace rocksdb
back to top