https://github.com/BenLangmead/bowtie2
Tip revision: a43fa6f43f54989468a294967898f85b9fe4cefa authored by Rone Charles on 17 January 2024, 00:01:39 UTC
Remove whitespace character preventing the removal -lrt from LDLIBS
Remove whitespace character preventing the removal -lrt from LDLIBS
Tip revision: a43fa6f
threadpool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <map>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<T> data_queue;
public:
threadsafe_queue() {}
void push(T &&new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.emplace(new_value);
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
size_t size() {
std::lock_guard<std::mutex> lk(mut);
return data_queue.size();
}
};
class thread_pool
{
std::atomic_bool done;
int nthreads;
std::map<std::thread::id, int> thread_id;
threadsafe_queue<std::function<void()>> work_queue;
std::vector<std::thread> threads;
std::condition_variable cv;
std::mutex m;
void worker_thread() {
while (!done) {
std::function<void()> task;
if (work_queue.try_pop(task)) {
task();
} else {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&] {return work_queue.size() != 0 || done; });
}
}
}
public:
thread_pool(int nthr):
done(false), nthreads(nthr)
{
try {
for (int i = 0; i < nthreads; ++i) {
threads.emplace_back(
std::thread(&thread_pool::worker_thread, this));
thread_id[threads[i].get_id()] = i;
}
} catch (...) {
done = true;
throw;
}
}
~thread_pool() {
if (nthreads > 0) {
done = true;
std::unique_lock<std::mutex> lock(m);
cv.notify_all();
lock.unlock();
for (std::thread &thread : threads) {
thread.join();
}
}
}
template<typename Function, typename... Args>
std::future<typename std::result_of<Function(Args...)>::type>
submit(Function &&f, Args&&... args) {
using result_type = typename std::result_of<Function(Args...)>::type;
auto task = std::make_shared<std::packaged_task<result_type()>>(std::bind(std::forward<Function>(f), std::forward<Args>(args)...));
std::future<result_type> res(task->get_future());
work_queue.push([task] { (*task)(); });
std::unique_lock<std::mutex> lock(m);
cv.notify_one();
return res;
}
int size() {
return nthreads;
}
int thread_id_to_int(std::thread::id id) {
return thread_id[id];
}
template<typename T, typename Function>
void parallel_for(T start, T end, T stride, Function &&f) {
T range = end - start;
T block_size = range / (nthreads);
T block_start = start;
T block_end = block_start + block_size;
if (block_size == 0)
block_end = end;
std::vector<std::future<void>> res;
while (block_start < end) {
res.emplace_back(submit(f, block_start, block_end, stride));
block_start = block_end;
block_end = block_end + block_size;
if (block_end >= end)
block_end = end;
}
for (size_t i = 0; i < res.size(); i++)
res[i].get();
}
};
#endif