Revision fd66005628f8187139601fdf245e6fff302b1aee authored by Akanksha Mahajan on 30 March 2022, 20:52:37 UTC, committed by Facebook GitHub Bot on 30 March 2022, 20:52:37 UTC
Summary:
Same as title

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9750

Test Plan:
export CRASH_TEST_EXT_ARGS=" --async_io=1 --adaptive_readahead=1;
make -j crash_test

Reviewed By: jay-zhuang

Differential Revision: D35114326

Pulled By: akankshamahajan15

fbshipit-source-id: 8b05c95be09f7aff6cb9eb757aa20a6520349d45
1 parent 60106b9
Raw File
random_access_file_reader.h
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#pragma once
#include <atomic>
#include <sstream>
#include <string>

#include "env/file_system_tracer.h"
#include "port/port.h"
#include "rocksdb/file_system.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
#include "rocksdb/rate_limiter.h"
#include "util/aligned_buffer.h"

namespace ROCKSDB_NAMESPACE {
class Statistics;
class HistogramImpl;
class SystemClock;

using AlignedBuf = std::unique_ptr<char[]>;

// Align the request r according to alignment and return the aligned result.
FSReadRequest Align(const FSReadRequest& r, size_t alignment);

// Try to merge src to dest if they have overlap.
//
// Each request represents an inclusive interval [offset, offset + len].
// If the intervals have overlap, update offset and len to represent the
// merged interval, and return true.
// Otherwise, do nothing and return false.
bool TryMerge(FSReadRequest* dest, const FSReadRequest& src);

// RandomAccessFileReader is a wrapper on top of Env::RandomAccessFile. It is
// responsible for:
// - Handling Buffered and Direct reads appropriately.
// - Rate limiting compaction reads.
// - Notifying any interested listeners on the completion of a read.
// - Updating IO stats.
class RandomAccessFileReader {
 private:
#ifndef ROCKSDB_LITE
  void NotifyOnFileReadFinish(
      uint64_t offset, size_t length,
      const FileOperationInfo::StartTimePoint& start_ts,
      const FileOperationInfo::FinishTimePoint& finish_ts,
      const Status& status) const {
    FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
                           finish_ts, status, file_temperature_);
    info.offset = offset;
    info.length = length;

    for (auto& listener : listeners_) {
      listener->OnFileReadFinish(info);
    }
    info.status.PermitUncheckedError();
  }

  void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation,
                       const std::string& file_path, size_t length,
                       uint64_t offset) const {
    if (listeners_.empty()) {
      return;
    }
    IOErrorInfo io_error_info(io_status, operation, file_path, length, offset);

    for (auto& listener : listeners_) {
      listener->OnIOError(io_error_info);
    }
    io_status.PermitUncheckedError();
  }

#endif  // ROCKSDB_LITE

  bool ShouldNotifyListeners() const { return !listeners_.empty(); }

  FSRandomAccessFilePtr file_;
  std::string file_name_;
  SystemClock* clock_;
  Statistics* stats_;
  uint32_t hist_type_;
  HistogramImpl* file_read_hist_;
  RateLimiter* rate_limiter_;
  std::vector<std::shared_ptr<EventListener>> listeners_;
  const Temperature file_temperature_;
  const bool is_last_level_;

 public:
  explicit RandomAccessFileReader(
      std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
      SystemClock* clock = nullptr,
      const std::shared_ptr<IOTracer>& io_tracer = nullptr,
      Statistics* stats = nullptr, uint32_t hist_type = 0,
      HistogramImpl* file_read_hist = nullptr,
      RateLimiter* rate_limiter = nullptr,
      const std::vector<std::shared_ptr<EventListener>>& listeners = {},
      Temperature file_temperature = Temperature::kUnknown,
      bool is_last_level = false)
      : file_(std::move(raf), io_tracer, _file_name),
        file_name_(std::move(_file_name)),
        clock_(clock),
        stats_(stats),
        hist_type_(hist_type),
        file_read_hist_(file_read_hist),
        rate_limiter_(rate_limiter),
        listeners_(),
        file_temperature_(file_temperature),
        is_last_level_(is_last_level) {
#ifndef ROCKSDB_LITE
    std::for_each(listeners.begin(), listeners.end(),
                  [this](const std::shared_ptr<EventListener>& e) {
                    if (e->ShouldBeNotifiedOnFileIO()) {
                      listeners_.emplace_back(e);
                    }
                  });
#else  // !ROCKSDB_LITE
    (void)listeners;
#endif
  }

  static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
                         const std::string& fname, const FileOptions& file_opts,
                         std::unique_ptr<RandomAccessFileReader>* reader,
                         IODebugContext* dbg);
  RandomAccessFileReader(const RandomAccessFileReader&) = delete;
  RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;

  // In non-direct IO mode,
  // 1. if using mmap, result is stored in a buffer other than scratch;
  // 2. if not using mmap, result is stored in the buffer starting from scratch.
  //
  // In direct IO mode, an aligned buffer is allocated internally.
  // 1. If aligned_buf is null, then results are copied to the buffer
  // starting from scratch;
  // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
  // the internally allocated buffer on return, and the result refers to a
  // region in aligned_buf.
  //
  // `rate_limiter_priority` is used to charge the internal rate limiter when
  // enabled. The special value `Env::IO_TOTAL` makes this operation bypass the
  // rate limiter.
  IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
                char* scratch, AlignedBuf* aligned_buf,
                Env::IOPriority rate_limiter_priority) const;

  // REQUIRES:
  // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
  // In non-direct IO mode, aligned_buf should be null;
  // In direct IO mode, aligned_buf stores the aligned buffer allocated inside
  // MultiRead, the result Slices in reqs refer to aligned_buf.
  //
  // `rate_limiter_priority` will be used to charge the internal rate limiter.
  // It is not yet supported so the client must provide the special value
  // `Env::IO_TOTAL` to bypass the rate limiter.
  IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs,
                     size_t num_reqs, AlignedBuf* aligned_buf,
                     Env::IOPriority rate_limiter_priority) const;

  IOStatus Prefetch(uint64_t offset, size_t n) const {
    return file_->Prefetch(offset, n, IOOptions(), nullptr);
  }

  FSRandomAccessFile* file() { return file_.get(); }

  const std::string& file_name() const { return file_name_; }

  bool use_direct_io() const { return file_->use_direct_io(); }

  IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts);

  IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
                     std::function<void(const FSReadRequest&, void*)> cb,
                     void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
                     Env::IOPriority rate_limiter_priority);
};
}  // namespace ROCKSDB_NAMESPACE
back to top