1 #include "cado.h" // IWYU pragma: keep
2 // IWYU pragma: no_include <ext/alloc_traits.h>
3 #include <cstdint> // for SIZE_MAX
4 #include <queue> // for queue, priority_queue
5 #include <utility> // for move
6 #include "clonable-exception.hpp" // for clonable_exception
7 #include "timing.h" // for seconds_thread
8 #include "threadpool.hpp"
9 #include "macros.h"
10
11 /*
12 With multiple queues, when new work is added to a queue, we need to be able
13 to wake up one of the threads that prefer work from that queue. Thus we need
14 multiple condition variables. If no threads that prefer work from that queue
15 are currently waiting, we need to wake up some other thread.
16
17 With k queues, we need k condition variables c[] and k semaphores s[].
18 When a thread that prefers queue i waits for work, in increases s[i] and starts waiting on c[i].
19 When a thread that was waiting is woken up, it decreases s[i].
20 When work is added to queue j, it checks whether s[j] is non-zero:
21 - if so, it signals c[j]
22 - if not, it tests whether any other c[l] is non-zero
23 - if so, it signals c[l]
24 - if not, then no threads are currently sleeping, and nothing needs to be done
25
26 We use a simple size_t variable as the semaphore; accesses are mutex-protected.
27 */
28
worker_thread(thread_pool & _pool,const size_t _preferred_queue,bool several_threads)29 worker_thread::worker_thread(thread_pool &_pool, const size_t _preferred_queue, bool several_threads)
30 : pool(_pool), preferred_queue(several_threads ? _preferred_queue : SIZE_MAX)
31 {
32 if (!several_threads) {
33 // the "pthread" member is uninitialized, but that is fine since
34 // it's only used in the dtor anyway, under the condition that
35 // preferred_queue != SIZE_MAX.
36 // coverity[uninit_member]
37 return;
38 }
39 int rc = pthread_create(&thread, NULL, pool.thread_work_on_tasks_static, this);
40 ASSERT_ALWAYS(rc == 0);
41 }
42
~worker_thread()43 worker_thread::~worker_thread() {
44 if (preferred_queue == SIZE_MAX) return;
45 int rc = pthread_join(thread, NULL);
46 // timer.self will be essentially lost here. the best thing to do is to
47 // create a phony task which collects the busy wait times for all
48 // threads, at regular intervals, so that timer.self will be
49 // insignificant.
50 // pool.timer += timer;
51 ASSERT_ALWAYS_NOTHROW(rc == 0);
52 }
53
rank() const54 int worker_thread::rank() const { return this - pool.threads.data(); }
nthreads() const55 int worker_thread::nthreads() const { return pool.threads.size(); }
is_synchronous() const56 bool worker_thread::is_synchronous() const { return pool.is_synchronous(); }
57
58 class thread_task {
59 public:
60 task_function_t func = nullptr;
61 task_parameters * parameters = nullptr;
62 int id = 0;
63 double cost = 0.0; // costly tasks are scheduled first.
64
is_terminal()65 bool is_terminal() { return func == NULL; }
66
thread_task(task_function_t _func,task_parameters * _parameters,int _id,double _cost)67 thread_task(task_function_t _func, task_parameters *_parameters, int _id, double _cost) :
68 func(_func), parameters(_parameters), id(_id), cost(_cost) {}
thread_task(bool)69 thread_task(bool) {}
operator ()(worker_thread * w)70 task_result * operator()(worker_thread * w) { return (*func)(w, parameters, id); }
71 };
72
73 struct thread_task_cmp
74 {
operator ()thread_task_cmp75 bool operator() (thread_task const &x, thread_task const &y) const {
76 if (x.cost < y.cost)
77 return true;
78 if (x.cost > y.cost)
79 return false;
80 // if costs are equal, compare ids (they should be distinct)
81 return x.id < y.id;
82 }
83 };
84
85 class tasks_queue : public std::priority_queue<thread_task, std::vector<thread_task>, thread_task_cmp>, private NonCopyable {
86 public:
87 condition_variable not_empty;
88 size_t nr_threads_waiting;
tasks_queue()89 tasks_queue() : nr_threads_waiting(0){};
90 };
91
92 class results_queue : public std::queue<task_result *>, private NonCopyable {
93 public:
94 condition_variable not_empty;
95 };
96
97 class exceptions_queue : public std::queue<clonable_exception *>, private NonCopyable {
98 public:
99 condition_variable not_empty;
100 };
101
102
thread_pool(const size_t nr_threads,double & store_wait_time,const size_t nr_queues)103 thread_pool::thread_pool(const size_t nr_threads, double & store_wait_time, const size_t nr_queues)
104 :
105 monitor_or_synchronous(nr_threads == 1),
106 tasks(nr_queues), results(nr_queues), exceptions(nr_queues),
107 created(nr_queues, 0), joined(nr_queues, 0),
108 kill_threads(false),
109 store_wait_time(store_wait_time)
110 {
111 /* Threads start accessing the queues as soon as they run */
112 threads.reserve(nr_threads);
113 for (size_t i = 0; i < nr_threads; i++)
114 threads.emplace_back(*this, 0, !is_synchronous());
115 };
116
117 // ok, if an exception is raised we'll die abruptly.
118 // coverity[exn_spec_violation]
~thread_pool()119 thread_pool::~thread_pool() {
120 drain_all_queues();
121 enter();
122 kill_threads = true;
123 for (auto & T : tasks) broadcast(T.not_empty); /* Wakey wakey, time to die */
124 leave();
125 drain_all_queues();
126 threads.clear(); /* does pthread_join */
127 for (auto const & T : tasks) ASSERT_ALWAYS_NOTHROW(T.empty());
128 for (auto const & R : results) ASSERT_ALWAYS_NOTHROW(R.empty());
129 for (auto const & E : exceptions) ASSERT_ALWAYS_NOTHROW(E.empty());
130 store_wait_time += cumulated_wait_time;
131 }
132
133 void *
thread_work_on_tasks_static(void * arg)134 thread_pool::thread_work_on_tasks_static(void *arg)
135 {
136 worker_thread *I = static_cast<worker_thread *>(arg);
137 I->pool.thread_work_on_tasks(*I);
138 return NULL;
139 }
140
141 void
thread_work_on_tasks(worker_thread & I)142 thread_pool::thread_work_on_tasks(worker_thread & I)
143 {
144 /* we removed the per-thread timer, because that goes in the way
145 * of our intent to make threads more special-q agnostic: timers are
146 * attached to the nfs_aux structure, now. This implies that all
147 * routines that are called as worker threads must activate their timer
148 * on entry.
149 *
150 */
151 ASSERT_ALWAYS(!is_synchronous());
152 double tt = -seconds_thread();
153 while (1) {
154 size_t queue = I.preferred_queue;
155 thread_task task = get_task(queue);
156 if (task.is_terminal())
157 break;
158 try {
159 tt += seconds_thread();
160 task_result *result = task(&I);
161 tt -= seconds_thread();
162 if (result != NULL)
163 add_result(queue, result);
164 } catch (clonable_exception const& e) {
165 tt -= seconds_thread();
166 add_exception(queue, e.clone());
167 /* We need to wake the listener... */
168 add_result(queue, NULL);
169 }
170 }
171 tt += seconds_thread();
172 /* tt is now the wall-clock time spent really within this function,
173 * waiting for mutexes and condition variables... */
174 std::lock_guard<std::mutex> dummy(mm_cumulated_wait_time);
175 cumulated_wait_time += tt;
176 }
177
178 bool
all_task_queues_empty() const179 thread_pool::all_task_queues_empty() const
180 {
181 for (auto const & T : tasks)
182 if (!T.empty()) return false;
183 return true;
184 }
185
186
187 void
add_task(task_function_t func,task_parameters * params,const int id,const size_t queue,double cost)188 thread_pool::add_task(task_function_t func, task_parameters * params,
189 const int id, const size_t queue, double cost)
190 {
191 if (is_synchronous()) {
192 /* Execute the function right away, simulate the action of a
193 * secondary thread fetching it from the task queue */
194 created[queue]++;
195 try {
196 task_result *result = func(threads.data(), params, id);
197 if (result != NULL)
198 results[queue].push(result);
199 } catch (clonable_exception const& e) {
200 exceptions[queue].push(e.clone());
201 /* We do this in the asynchronous case. It isn't clear that
202 * we need to do the same in the syncronous case. */
203 results[queue].push(NULL);
204 }
205 return;
206 }
207 ASSERT_ALWAYS(queue < tasks.size());
208 enter();
209 ASSERT_ALWAYS(!kill_threads);
210 tasks[queue].push(thread_task(func, params, id, cost));
211 created[queue]++;
212
213 /* Find a queue with waiting threads, starting with "queue" */
214 size_t i = queue;
215 if (tasks[i].nr_threads_waiting == 0) {
216 for (i = 0; i < tasks.size() && tasks[i].nr_threads_waiting == 0; i++) {}
217 }
218 /* If any queue with waiting threads was found, wake up one of them */
219 if (i < tasks.size())
220 signal(tasks[i].not_empty);
221 leave();
222 }
223
224 thread_task
get_task(size_t & preferred_queue)225 thread_pool::get_task(size_t& preferred_queue)
226 {
227 ASSERT(!is_synchronous());
228 enter();
229 while (!kill_threads && all_task_queues_empty()) {
230 /* No work -> tell this thread to wait until work becomes available.
231 We also leave the loop when the thread needs to die.
232 The while() protects against spurious wake-ups that can fire even if
233 the queue is still empty. */
234 tasks[preferred_queue].nr_threads_waiting++;
235 wait(tasks[preferred_queue].not_empty);
236 tasks[preferred_queue].nr_threads_waiting--;
237 }
238 thread_task task(true);
239 if (kill_threads && all_task_queues_empty()) {
240 /* then the default object above is appropriate for signaling all
241 * workers so that they terminate.
242 */
243 } else {
244 /* Find a non-empty task queue, starting with the preferred one */
245 size_t& i(preferred_queue);
246 if (tasks[i].empty()) {
247 for (i = 0; i < tasks.size() && tasks[i].empty(); i++) {}
248 }
249 /* There must have been a non-empty queue or we'd still be in the while()
250 loop above */
251 ASSERT_ALWAYS(i < tasks.size());
252 task = std::move(tasks[i].top());
253 tasks[i].pop();
254 }
255 leave();
256 return task;
257 }
258
259 void
add_result(const size_t queue,task_result * const result)260 thread_pool::add_result(const size_t queue, task_result *const result) {
261 ASSERT(!is_synchronous()); // synchronous case: see add_task
262 ASSERT_ALWAYS(queue < results.size());
263 enter();
264 results[queue].push(result);
265 signal(results[queue].not_empty);
266 leave();
267 }
268
269 void
add_exception(const size_t queue,clonable_exception * e)270 thread_pool::add_exception(const size_t queue, clonable_exception * e) {
271 ASSERT(!is_synchronous()); // synchronous case: see add_task
272 ASSERT_ALWAYS(queue < results.size());
273 enter();
274 exceptions[queue].push(e);
275 // do we use it ?
276 signal(results[queue].not_empty);
277 leave();
278 }
279
280 /* Get a result from the specified results queue. If no result is available,
281 waits with blocking=true, and returns NULL with blocking=false. */
282 task_result *
get_result(const size_t queue,const bool blocking)283 thread_pool::get_result(const size_t queue, const bool blocking) {
284 task_result *result;
285 ASSERT_ALWAYS(queue < results.size());
286
287 /* works both in synchronous and non-synchronous case */
288 enter();
289 if (!blocking and results[queue].empty()) {
290 result = NULL;
291 } else {
292 while (results[queue].empty())
293 wait(results[queue].not_empty);
294 result = results[queue].front();
295 results[queue].pop();
296 joined[queue]++;
297 }
298 leave();
299 return result;
300 }
301
drain_queue(const size_t queue,void (* f)(task_result *))302 void thread_pool::drain_queue(const size_t queue, void (*f)(task_result*))
303 {
304 /* works both in synchronous and non-synchronous case */
305 enter();
306 for(size_t cr = created[queue]; joined[queue] < cr ; ) {
307 while (results[queue].empty())
308 wait(results[queue].not_empty);
309 task_result * result = results[queue].front();
310 results[queue].pop();
311 joined[queue]++;
312 if (f) f(result);
313 delete result;
314 }
315 leave();
316 }
drain_all_queues()317 void thread_pool::drain_all_queues()
318 {
319 for(size_t queue = 0 ; queue < results.size() ; ++queue) {
320 drain_queue(queue);
321 }
322 }
323
324 /* get an exception from the specified exceptions queue. This is
325 * obviously non-blocking, because exceptions are exceptional. So when no
326 * exception is there, we return NULL. When there is one, we return a
327 * pointer to a newly allocated copy of it.
328 */
get_exception(const size_t queue)329 clonable_exception * thread_pool::get_exception(const size_t queue) {
330 clonable_exception * e = NULL;
331 ASSERT_ALWAYS(queue < exceptions.size());
332 /* works both in synchronous and non-synchronous case */
333 enter();
334 if (!exceptions[queue].empty()) {
335 e = exceptions[queue].front();
336 exceptions[queue].pop();
337 }
338 leave();
339 return e;
340 }
341