https://github.com/Kitware/CMake
Raw File
Tip revision: 13e46189c7f3b39a26e9ca689bc029b7061d26a7 authored by Brad King on 16 November 2022, 14:42:03 UTC
CMake 3.25.0
Tip revision: 13e4618
cmWorkerPool.cxx
/* Distributed under the OSI-approved BSD 3-Clause License.  See accompanying
   file Copyright.txt or https://cmake.org/licensing for details.  */
#include "cmWorkerPool.h"

#include <algorithm>
#include <array>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>

#include <cm/memory>

#include <cm3p/uv.h>

#include "cmRange.h"
#include "cmStringAlgorithms.h"
#include "cmUVHandlePtr.h"
#include "cmUVSignalHackRAII.h" // IWYU pragma: keep

/**
 * @brief libuv pipe buffer class
 */
class cmUVPipeBuffer
{
public:
  using DataRange = cmRange<const char*>;
  using DataFunction = std::function<void(DataRange)>;
  /// On error the ssize_t argument is a non zero libuv error code
  using EndFunction = std::function<void(ssize_t)>;

  /**
   * Reset to construction state
   */
  void reset();

  /**
   * Initializes uv_pipe(), uv_stream() and uv_handle()
   * @return true on success
   */
  bool init(uv_loop_t* uv_loop);

  /**
   * Start reading
   * @return true on success
   */
  bool startRead(DataFunction dataFunction, EndFunction endFunction);

  //! libuv pipe
  uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
  //! uv_pipe() casted to libuv stream
  uv_stream_t* uv_stream() const
  {
    return static_cast<uv_stream_t*>(this->UVPipe_);
  }
  //! uv_pipe() casted to libuv handle
  uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }

private:
  // -- Libuv callbacks
  static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
                      uv_buf_t* buf);
  static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);

  cm::uv_pipe_ptr UVPipe_;
  std::vector<char> Buffer_;
  DataFunction DataFunction_;
  EndFunction EndFunction_;
};

void cmUVPipeBuffer::reset()
{
  if (this->UVPipe_.get() != nullptr) {
    this->EndFunction_ = nullptr;
    this->DataFunction_ = nullptr;
    this->Buffer_.clear();
    this->Buffer_.shrink_to_fit();
    this->UVPipe_.reset();
  }
}

bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
{
  this->reset();
  if (uv_loop == nullptr) {
    return false;
  }
  int ret = this->UVPipe_.init(*uv_loop, 0, this);
  return (ret == 0);
}

bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
                               EndFunction endFunction)
{
  if (this->UVPipe_.get() == nullptr) {
    return false;
  }
  if (!dataFunction || !endFunction) {
    return false;
  }
  this->DataFunction_ = std::move(dataFunction);
  this->EndFunction_ = std::move(endFunction);
  int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
                          &cmUVPipeBuffer::UVData);
  return (ret == 0);
}

void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
                             uv_buf_t* buf)
{
  auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
  pipe.Buffer_.resize(suggestedSize);
  buf->base = pipe.Buffer_.data();
  buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
}

void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
                            const uv_buf_t* buf)
{
  auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
  if (nread > 0) {
    if (buf->base != nullptr) {
      // Call data function
      pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
    }
  } else if (nread < 0) {
    // Save the end function on the stack before resetting the pipe
    EndFunction efunc;
    efunc.swap(pipe.EndFunction_);
    // Reset pipe before calling the end function
    pipe.reset();
    // Call end function
    efunc((nread == UV_EOF) ? 0 : nread);
  }
}

/**
 * @brief External process management class
 */
class cmUVReadOnlyProcess
{
public:
  // -- Types
  //! @brief Process settings
  struct SetupT
  {
    std::string WorkingDirectory;
    std::vector<std::string> Command;
    cmWorkerPool::ProcessResultT* Result = nullptr;
    bool MergedOutput = false;
  };

  // -- Const accessors
  SetupT const& Setup() const { return this->Setup_; }
  cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
  bool IsStarted() const { return this->IsStarted_; }
  bool IsFinished() const { return this->IsFinished_; }

  // -- Runtime
  void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
             std::vector<std::string> const& command,
             std::string const& workingDirectory = std::string());
  bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);

private:
  // -- Libuv callbacks
  static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
  void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
  void UVPipeOutEnd(ssize_t error);
  void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
  void UVPipeErrEnd(ssize_t error);
  void UVTryFinish();

  // -- Setup
  SetupT Setup_;
  // -- Runtime
  bool IsStarted_ = false;
  bool IsFinished_ = false;
  std::function<void()> FinishedCallback_;
  std::vector<const char*> CommandPtr_;
  std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
  uv_process_options_t UVOptions_;
  cm::uv_process_ptr UVProcess_;
  cmUVPipeBuffer UVPipeOut_;
  cmUVPipeBuffer UVPipeErr_;
};

void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
                                bool mergedOutput,
                                std::vector<std::string> const& command,
                                std::string const& workingDirectory)
{
  this->Setup_.WorkingDirectory = workingDirectory;
  this->Setup_.Command = command;
  this->Setup_.Result = result;
  this->Setup_.MergedOutput = mergedOutput;
}

bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
                                std::function<void()> finishedCallback)
{
  if (this->IsStarted() || (this->Result() == nullptr)) {
    return false;
  }

  // Reset result before the start
  this->Result()->reset();

  // Fill command string pointers
  if (!this->Setup().Command.empty()) {
    this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
    for (std::string const& arg : this->Setup().Command) {
      this->CommandPtr_.push_back(arg.c_str());
    }
    this->CommandPtr_.push_back(nullptr);
  } else {
    this->Result()->ErrorMessage = "Empty command";
  }

  if (!this->Result()->error()) {
    if (!this->UVPipeOut_.init(uv_loop)) {
      this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
    }
  }
  if (!this->Result()->error()) {
    if (!this->UVPipeErr_.init(uv_loop)) {
      this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
    }
  }
  if (!this->Result()->error()) {
    // -- Setup process stdio options
    // stdin
    this->UVOptionsStdIO_[0].flags = UV_IGNORE;
    this->UVOptionsStdIO_[0].data.stream = nullptr;
    // stdout
    this->UVOptionsStdIO_[1].flags =
      static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
    this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
    // stderr
    this->UVOptionsStdIO_[2].flags =
      static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
    this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();

    // -- Setup process options
    std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
                sizeof(this->UVOptions_), 0);
    this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
    this->UVOptions_.file = this->CommandPtr_[0];
    this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
    this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
    this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
    this->UVOptions_.stdio_count =
      static_cast<int>(this->UVOptionsStdIO_.size());
    this->UVOptions_.stdio = this->UVOptionsStdIO_.data();

    // -- Spawn process
    int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
    if (uvErrorCode != 0) {
      this->Result()->ErrorMessage = "libuv process spawn failed";
      if (const char* uvErr = uv_strerror(uvErrorCode)) {
        this->Result()->ErrorMessage += ": ";
        this->Result()->ErrorMessage += uvErr;
      }
    }
  }
  // -- Start reading from stdio streams
  if (!this->Result()->error()) {
    if (!this->UVPipeOut_.startRead(
          [this](cmUVPipeBuffer::DataRange range) {
            this->UVPipeOutData(range);
          },
          [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
      this->Result()->ErrorMessage =
        "libuv start reading from stdout pipe failed";
    }
  }
  if (!this->Result()->error()) {
    if (!this->UVPipeErr_.startRead(
          [this](cmUVPipeBuffer::DataRange range) {
            this->UVPipeErrData(range);
          },
          [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
      this->Result()->ErrorMessage =
        "libuv start reading from stderr pipe failed";
    }
  }

  if (!this->Result()->error()) {
    this->IsStarted_ = true;
    this->FinishedCallback_ = std::move(finishedCallback);
  } else {
    // Clear libuv handles and finish
    this->UVProcess_.reset();
    this->UVPipeOut_.reset();
    this->UVPipeErr_.reset();
    this->CommandPtr_.clear();
  }

  return this->IsStarted();
}

void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
                                 int termSignal)
{
  auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
  if (proc.IsStarted() && !proc.IsFinished()) {
    // Set error message on demand
    proc.Result()->ExitStatus = exitStatus;
    proc.Result()->TermSignal = termSignal;
    if (!proc.Result()->error()) {
      if (termSignal != 0) {
        proc.Result()->ErrorMessage = cmStrCat(
          "Process was terminated by signal ", proc.Result()->TermSignal);
      } else if (exitStatus != 0) {
        proc.Result()->ErrorMessage = cmStrCat(
          "Process failed with return value ", proc.Result()->ExitStatus);
      }
    }

    // Reset process handle
    proc.UVProcess_.reset();
    // Try finish
    proc.UVTryFinish();
  }
}

void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
{
  this->Result()->StdOut.append(data.begin(), data.end());
}

void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
{
  // Process pipe error
  if ((error != 0) && !this->Result()->error()) {
    this->Result()->ErrorMessage = cmStrCat(
      "Reading from stdout pipe failed with libuv error code ", error);
  }
  // Try finish
  this->UVTryFinish();
}

void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
{
  std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
                                               : &this->Result()->StdErr;
  str->append(data.begin(), data.end());
}

void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
{
  // Process pipe error
  if ((error != 0) && !this->Result()->error()) {
    this->Result()->ErrorMessage = cmStrCat(
      "Reading from stderr pipe failed with libuv error code ", error);
  }
  // Try finish
  this->UVTryFinish();
}

void cmUVReadOnlyProcess::UVTryFinish()
{
  // There still might be data in the pipes after the process has finished.
  // Therefore check if the process is finished AND all pipes are closed
  // before signaling the worker thread to continue.
  if ((this->UVProcess_.get() != nullptr) ||
      (this->UVPipeOut_.uv_pipe() != nullptr) ||
      (this->UVPipeErr_.uv_pipe() != nullptr)) {
    return;
  }
  this->IsFinished_ = true;
  this->FinishedCallback_();
}

/**
 * @brief Worker pool worker thread
 */
class cmWorkerPoolWorker
{
public:
  cmWorkerPoolWorker(uv_loop_t& uvLoop);
  ~cmWorkerPoolWorker();

  cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
  cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;

  /**
   * Set the internal thread
   */
  void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }

  /**
   * Run an external process
   */
  bool RunProcess(cmWorkerPool::ProcessResultT& result,
                  std::vector<std::string> const& command,
                  std::string const& workingDirectory);

private:
  // -- Libuv callbacks
  static void UVProcessStart(uv_async_t* handle);
  void UVProcessFinished();

  // -- Process management
  struct
  {
    std::mutex Mutex;
    cm::uv_async_ptr Request;
    std::condition_variable Condition;
    std::unique_ptr<cmUVReadOnlyProcess> ROP;
  } Proc_;
  // -- System thread
  std::thread Thread_;
};

cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
{
  this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
}

cmWorkerPoolWorker::~cmWorkerPoolWorker()
{
  if (this->Thread_.joinable()) {
    this->Thread_.join();
  }
}

bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
                                    std::vector<std::string> const& command,
                                    std::string const& workingDirectory)
{
  if (command.empty()) {
    return false;
  }
  // Create process instance
  {
    std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
    this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
    this->Proc_.ROP->setup(&result, true, command, workingDirectory);
  }
  // Send asynchronous process start request to libuv loop
  this->Proc_.Request.send();
  // Wait until the process has been finished and destroyed
  {
    std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
    while (this->Proc_.ROP) {
      this->Proc_.Condition.wait(ulock);
    }
  }
  return !result.error();
}

void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
{
  auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
  bool startFailed = false;
  {
    auto& Proc = wrk->Proc_;
    std::lock_guard<std::mutex> lock(Proc.Mutex);
    if (Proc.ROP && !Proc.ROP->IsStarted()) {
      startFailed =
        !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
    }
  }
  // Clean up if starting of the process failed
  if (startFailed) {
    wrk->UVProcessFinished();
  }
}

void cmWorkerPoolWorker::UVProcessFinished()
{
  std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  if (this->Proc_.ROP &&
      (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
    this->Proc_.ROP.reset();
  }
  // Notify idling thread
  this->Proc_.Condition.notify_one();
}

/**
 * @brief Private worker pool internals
 */
class cmWorkerPoolInternal
{
public:
  // -- Constructors
  cmWorkerPoolInternal(cmWorkerPool* pool);
  ~cmWorkerPoolInternal();

  /**
   * Runs the libuv loop.
   */
  bool Process();

  /**
   * Clear queue and abort threads.
   */
  void Abort();

  /**
   * Push a job to the queue and notify a worker.
   */
  bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);

  /**
   * Worker thread main loop method.
   */
  void Work(unsigned int workerIndex);

  // -- Request slots
  static void UVSlotBegin(uv_async_t* handle);
  static void UVSlotEnd(uv_async_t* handle);

  // -- UV loop
#ifdef CMAKE_UV_SIGNAL_HACK
  std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
#endif
  std::unique_ptr<uv_loop_t> UVLoop;
  cm::uv_async_ptr UVRequestBegin;
  cm::uv_async_ptr UVRequestEnd;

  // -- Thread pool and job queue
  std::mutex Mutex;
  bool Processing = false;
  bool Aborting = false;
  bool FenceProcessing = false;
  unsigned int WorkersRunning = 0;
  unsigned int WorkersIdle = 0;
  unsigned int JobsProcessing = 0;
  std::deque<cmWorkerPool::JobHandleT> Queue;
  std::condition_variable Condition;
  std::condition_variable ConditionFence;
  std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;

  // -- References
  cmWorkerPool* Pool = nullptr;
};

void cmWorkerPool::ProcessResultT::reset()
{
  this->ExitStatus = 0;
  this->TermSignal = 0;
  if (!this->StdOut.empty()) {
    this->StdOut.clear();
    this->StdOut.shrink_to_fit();
  }
  if (!this->StdErr.empty()) {
    this->StdErr.clear();
    this->StdErr.shrink_to_fit();
  }
  if (!this->ErrorMessage.empty()) {
    this->ErrorMessage.clear();
    this->ErrorMessage.shrink_to_fit();
  }
}

cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
  : Pool(pool)
{
  // Initialize libuv loop
  uv_disable_stdio_inheritance();
#ifdef CMAKE_UV_SIGNAL_HACK
  UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
#endif
  this->UVLoop = cm::make_unique<uv_loop_t>();
  uv_loop_init(this->UVLoop.get());
}

cmWorkerPoolInternal::~cmWorkerPoolInternal()
{
  uv_loop_close(this->UVLoop.get());
}

bool cmWorkerPoolInternal::Process()
{
  // Reset state flags
  this->Processing = true;
  this->Aborting = false;
  // Initialize libuv asynchronous request
  this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
                            this);
  this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
                          this);
  // Send begin request
  this->UVRequestBegin.send();
  // Run libuv loop
  bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
  // Update state flags
  this->Processing = false;
  this->Aborting = false;
  return success;
}

void cmWorkerPoolInternal::Abort()
{
  // Clear all jobs and set abort flag
  std::lock_guard<std::mutex> guard(this->Mutex);
  if (!this->Aborting) {
    // Register abort and clear queue
    this->Aborting = true;
    this->Queue.clear();
    this->Condition.notify_all();
  }
}

inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
{
  std::lock_guard<std::mutex> guard(this->Mutex);
  if (this->Aborting) {
    return false;
  }
  // Append the job to the queue
  this->Queue.emplace_back(std::move(jobHandle));
  // Notify an idle worker if there's one
  if (this->WorkersIdle != 0) {
    this->Condition.notify_one();
  }
  // Return success
  return true;
}

void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
{
  auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  // Create worker threads
  {
    unsigned int const num = gint.Pool->ThreadCount();
    // Create workers
    gint.Workers.reserve(num);
    for (unsigned int ii = 0; ii != num; ++ii) {
      gint.Workers.emplace_back(
        cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
    }
    // Start worker threads
    for (unsigned int ii = 0; ii != num; ++ii) {
      gint.Workers[ii]->SetThread(
        std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
    }
  }
  // Destroy begin request
  gint.UVRequestBegin.reset();
}

void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
{
  auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  // Join and destroy worker threads
  gint.Workers.clear();
  // Destroy end request
  gint.UVRequestEnd.reset();
}

void cmWorkerPoolInternal::Work(unsigned int workerIndex)
{
  cmWorkerPool::JobHandleT jobHandle;
  std::unique_lock<std::mutex> uLock(this->Mutex);
  // Increment running workers count
  ++this->WorkersRunning;
  // Enter worker main loop
  while (true) {
    // Abort on request
    if (this->Aborting) {
      break;
    }
    // Wait for new jobs on the main CV
    if (this->Queue.empty()) {
      ++this->WorkersIdle;
      this->Condition.wait(uLock);
      --this->WorkersIdle;
      continue;
    }

    // If there is a fence currently active or waiting,
    // sleep on the main CV and try again.
    if (this->FenceProcessing) {
      this->Condition.wait(uLock);
      continue;
    }

    // Pop next job from queue
    jobHandle = std::move(this->Queue.front());
    this->Queue.pop_front();

    // Check for fence jobs
    bool raisedFence = false;
    if (jobHandle->IsFence()) {
      this->FenceProcessing = true;
      raisedFence = true;
      // Wait on the Fence CV until all pending jobs are done.
      while (this->JobsProcessing != 0 && !this->Aborting) {
        this->ConditionFence.wait(uLock);
      }
      // When aborting, explicitly kick all threads alive once more.
      if (this->Aborting) {
        this->FenceProcessing = false;
        this->Condition.notify_all();
        break;
      }
    }

    // Unlocked scope for job processing
    ++this->JobsProcessing;
    {
      uLock.unlock();
      jobHandle->Work(this->Pool, workerIndex); // Process job
      jobHandle.reset();                        // Destroy job
      uLock.lock();
    }
    --this->JobsProcessing;

    // If this was the thread that entered fence processing
    // originally, notify all idling workers that the fence
    // is done.
    if (raisedFence) {
      this->FenceProcessing = false;
      this->Condition.notify_all();
    }
    // If fence processing is still not done, notify the
    // the fencing worker when all active jobs are done.
    if (this->FenceProcessing && this->JobsProcessing == 0) {
      this->ConditionFence.notify_all();
    }
  }

  // Decrement running workers count
  if (--this->WorkersRunning == 0) {
    // Last worker thread about to finish. Send libuv event.
    this->UVRequestEnd.send();
  }
}

cmWorkerPool::JobT::~JobT() = default;

bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
                                    std::vector<std::string> const& command,
                                    std::string const& workingDirectory)
{
  // Get worker by index
  auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
  return wrk->RunProcess(result, command, workingDirectory);
}

cmWorkerPool::cmWorkerPool()
  : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
{
}

cmWorkerPool::~cmWorkerPool() = default;

void cmWorkerPool::SetThreadCount(unsigned int threadCount)
{
  if (!this->Int_->Processing) {
    this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
  }
}

bool cmWorkerPool::Process(void* userData)
{
  // Setup user data
  this->UserData_ = userData;
  // Run libuv loop
  bool success = this->Int_->Process();
  // Clear user data
  this->UserData_ = nullptr;
  // Return
  return success;
}

bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
{
  return this->Int_->PushJob(std::move(jobHandle));
}

void cmWorkerPool::Abort()
{
  this->Int_->Abort();
}
back to top