https://github.com/facebook/rocksdb
Raw File
Tip revision: 97bf78721b7d9c1fa25e6a9b38b693d45e85196d authored by Jay Zhuang on 14 October 2020, 00:46:31 UTC
Update HISTORY.md and version.h for 6.13.3
Tip revision: 97bf787
work_queue.h
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

/*
 * Copyright (c) 2016-present, Facebook, Inc.
 * All rights reserved.
 *
 * This source code is licensed under both the BSD-style license (found in the
 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
 * in the COPYING file in the root directory of this source tree).
 */
#pragma once

#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>

namespace ROCKSDB_NAMESPACE {

/// Unbounded thread-safe work queue.
//
// This file is an excerpt from Facebook's zstd repo at
// https://github.com/facebook/zstd/. The relevant file is
// contrib/pzstd/utils/WorkQueue.h.

template <typename T>
class WorkQueue {
  // Protects all member variable access
  std::mutex mutex_;
  std::condition_variable readerCv_;
  std::condition_variable writerCv_;
  std::condition_variable finishCv_;

  std::queue<T> queue_;
  bool done_;
  std::size_t maxSize_;

  // Must have lock to call this function
  bool full() const {
    if (maxSize_ == 0) {
      return false;
    }
    return queue_.size() >= maxSize_;
  }

 public:
  /**
   * Constructs an empty work queue with an optional max size.
   * If `maxSize == 0` the queue size is unbounded.
   *
   * @param maxSize The maximum allowed size of the work queue.
   */
  WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}

  /**
   * Push an item onto the work queue.  Notify a single thread that work is
   * available.  If `finish()` has been called, do nothing and return false.
   * If `push()` returns false, then `item` has not been copied from.
   *
   * @param item  Item to push onto the queue.
   * @returns     True upon success, false if `finish()` has been called.  An
   *               item was pushed iff `push()` returns true.
   */
  template <typename U>
  bool push(U&& item) {
    {
      std::unique_lock<std::mutex> lock(mutex_);
      while (full() && !done_) {
        writerCv_.wait(lock);
      }
      if (done_) {
        return false;
      }
      queue_.push(std::forward<U>(item));
    }
    readerCv_.notify_one();
    return true;
  }

  /**
   * Attempts to pop an item off the work queue.  It will block until data is
   * available or `finish()` has been called.
   *
   * @param[out] item  If `pop` returns `true`, it contains the popped item.
   *                    If `pop` returns `false`, it is unmodified.
   * @returns          True upon success.  False if the queue is empty and
   *                    `finish()` has been called.
   */
  bool pop(T& item) {
    {
      std::unique_lock<std::mutex> lock(mutex_);
      while (queue_.empty() && !done_) {
        readerCv_.wait(lock);
      }
      if (queue_.empty()) {
        assert(done_);
        return false;
      }
      item = queue_.front();
      queue_.pop();
    }
    writerCv_.notify_one();
    return true;
  }

  /**
   * Sets the maximum queue size.  If `maxSize == 0` then it is unbounded.
   *
   * @param maxSize The new maximum queue size.
   */
  void setMaxSize(std::size_t maxSize) {
    {
      std::lock_guard<std::mutex> lock(mutex_);
      maxSize_ = maxSize;
    }
    writerCv_.notify_all();
  }

  /**
   * Promise that `push()` won't be called again, so once the queue is empty
   * there will never any more work.
   */
  void finish() {
    {
      std::lock_guard<std::mutex> lock(mutex_);
      assert(!done_);
      done_ = true;
    }
    readerCv_.notify_all();
    writerCv_.notify_all();
    finishCv_.notify_all();
  }

  /// Blocks until `finish()` has been called (but the queue may not be empty).
  void waitUntilFinished() {
    std::unique_lock<std::mutex> lock(mutex_);
    while (!done_) {
      finishCv_.wait(lock);
    }
  }
};
}  // namespace ROCKSDB_NAMESPACE
back to top