-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththread_pool.h
120 lines (99 loc) · 2.87 KB
/
thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <deque>
#include <mutex>
#include <thread>
#include <csignal>
#include <tuple>
#include <functional>
#include <memory>
#include <atomic>
#include "count_semaphore.h"
#include <iostream>
template <std::size_t thread_num=4>
class thread_pool {
public:
thread_pool();
thread_pool(const thread_pool&) = delete;
~thread_pool();
template <typename Func, typename... Args>
std::future<std::invoke_result_t<Func, Args...>> add_task(Func&&, Args&&... args);
template <typename Func>
std::future<std::invoke_result_t<Func>> add_task(Func&&);
std::size_t size() const;
void stop();
private:
std::array<std::thread, thread_num> workers;
count_semaphore has_job;
std::atomic_size_t idle_threads;
std::deque<std::function<void()>> queue;
std::mutex q_mutex;
std::sig_atomic_t quit;
};
/* ------------Member Implementation------------- */
template <std::size_t N>
thread_pool<N>::thread_pool()
: has_job(0), idle_threads(N), quit(0)
{
using namespace std;
auto routine = [this]
{
while (!quit) {
this->has_job.wait();
if (quit)
break;
idle_threads.fetch_sub(1, std::memory_order_release);
std::unique_lock<std::mutex> lock(this->q_mutex);
auto job = this->queue.front();
this->queue.pop_front();
lock.unlock();
job();
idle_threads.fetch_add(1, std::memory_order_release);
}
};
for (auto& worker: workers) {
worker = std::thread(routine);
}
}
template <std::size_t N>
thread_pool<N>::~thread_pool()
{
using namespace std;
stop();
for (int i = 0; i != idle_threads.load(); ++i)
has_job.post();
for (auto& worker : workers)
worker.join();
}
template <std::size_t N>
template <typename Func, typename... Args>
std::future<std::invoke_result_t<Func, Args...>> thread_pool<N>::add_task(Func&& func, Args&&... args)
{
typedef std::invoke_result_t<Func, Args...> result_type;
std::packaged_task<std::function<result_type(Args...)>> task = [func, args...] { return func(args...); };
{
std::lock_guard<std::mutex> lock(q_mutex);
queue.emplace_back([task] { task(); });
}
has_job.post();
return task.get_future();
}
template <std::size_t N>
template <typename Func>
std::future<std::invoke_result_t<Func>> thread_pool<N>::add_task(Func&& func)
{
using namespace std;
typedef std::invoke_result_t<Func> result_type;
auto task = std::make_shared<std::packaged_task<result_type()>>([func] { return func(); });
std::unique_lock<std::mutex> lock(q_mutex);
queue.emplace_back([task] { (*task)(); });
lock.unlock();
has_job.post();
return task->get_future();
}
template <std::size_t N>
void thread_pool<N>::stop()
{
quit = 1;
}
#endif