1 #include "cado.h" // IWYU pragma: keep
2 // IWYU pragma: no_include <ext/alloc_traits.h>
3 #include <cstdint>                // for SIZE_MAX
4 #include <queue>                   // for queue, priority_queue
5 #include <utility>                 // for move
6 #include "clonable-exception.hpp"  // for clonable_exception
7 #include "timing.h"                // for seconds_thread
8 #include "threadpool.hpp"
9 #include "macros.h"
10 
11 /*
12   With multiple queues, when new work is added to a queue, we need to be able
13   to wake up one of the threads that prefer work from that queue. Thus we need
14   multiple condition variables. If no threads that prefer work from that queue
15   are currently waiting, we need to wake up some other thread.
16 
17   With k queues, we need k condition variables c[] and k semaphores s[].
18   When a thread that prefers queue i waits for work, in increases s[i] and starts waiting on c[i].
19   When a thread that was waiting is woken up, it decreases s[i].
20   When work is added to queue j, it checks whether s[j] is non-zero:
21     - if so, it signals c[j]
22     - if not, it tests whether any other c[l] is non-zero
23       - if so, it signals c[l]
24       - if not, then no threads are currently sleeping, and nothing needs to be done
25 
26   We use a simple size_t variable as the semaphore; accesses are mutex-protected.
27 */
28 
worker_thread(thread_pool & _pool,const size_t _preferred_queue,bool several_threads)29 worker_thread::worker_thread(thread_pool &_pool, const size_t _preferred_queue, bool several_threads)
30   : pool(_pool), preferred_queue(several_threads ? _preferred_queue : SIZE_MAX)
31 {
32     if (!several_threads) {
33         // the "pthread" member is uninitialized, but that is fine since
34         // it's only used in the dtor anyway, under the condition that
35         // preferred_queue != SIZE_MAX.
36         // coverity[uninit_member]
37         return;
38     }
39     int rc = pthread_create(&thread, NULL, pool.thread_work_on_tasks_static, this);
40     ASSERT_ALWAYS(rc == 0);
41 }
42 
~worker_thread()43 worker_thread::~worker_thread() {
44   if (preferred_queue == SIZE_MAX) return;
45   int rc = pthread_join(thread, NULL);
46   // timer.self will be essentially lost here. the best thing to do is to
47   // create a phony task which collects the busy wait times for all
48   // threads, at regular intervals, so that timer.self will be
49   // insignificant.
50   // pool.timer += timer;
51   ASSERT_ALWAYS_NOTHROW(rc == 0);
52 }
53 
rank() const54 int worker_thread::rank() const { return this - pool.threads.data(); }
nthreads() const55 int worker_thread::nthreads() const { return pool.threads.size(); }
is_synchronous() const56 bool worker_thread::is_synchronous() const { return pool.is_synchronous(); }
57 
58 class thread_task {
59 public:
60   task_function_t func = nullptr;
61   task_parameters * parameters = nullptr;
62   int id = 0;
63   double cost = 0.0; // costly tasks are scheduled first.
64 
is_terminal()65   bool is_terminal() { return func == NULL; }
66 
thread_task(task_function_t _func,task_parameters * _parameters,int _id,double _cost)67   thread_task(task_function_t _func, task_parameters *_parameters, int _id, double _cost) :
68     func(_func), parameters(_parameters), id(_id), cost(_cost) {}
thread_task(bool)69   thread_task(bool) {}
operator ()(worker_thread * w)70   task_result * operator()(worker_thread * w) { return (*func)(w, parameters, id); }
71 };
72 
73 struct thread_task_cmp
74 {
operator ()thread_task_cmp75   bool operator() (thread_task const &x, thread_task const &y) const {
76     if (x.cost < y.cost)
77       return true;
78     if (x.cost > y.cost)
79       return false;
80     // if costs are equal, compare ids (they should be distinct)
81     return x.id < y.id;
82   }
83 };
84 
85 class tasks_queue : public std::priority_queue<thread_task, std::vector<thread_task>, thread_task_cmp>, private NonCopyable {
86   public:
87   condition_variable not_empty;
88   size_t nr_threads_waiting;
tasks_queue()89   tasks_queue() : nr_threads_waiting(0){};
90 };
91 
92 class results_queue : public std::queue<task_result *>, private NonCopyable {
93   public:
94   condition_variable not_empty;
95 };
96 
97 class exceptions_queue : public std::queue<clonable_exception *>, private NonCopyable {
98   public:
99   condition_variable not_empty;
100 };
101 
102 
thread_pool(const size_t nr_threads,double & store_wait_time,const size_t nr_queues)103 thread_pool::thread_pool(const size_t nr_threads, double & store_wait_time, const size_t nr_queues)
104   :
105       monitor_or_synchronous(nr_threads == 1),
106       tasks(nr_queues), results(nr_queues), exceptions(nr_queues),
107       created(nr_queues, 0), joined(nr_queues, 0),
108       kill_threads(false),
109       store_wait_time(store_wait_time)
110 {
111     /* Threads start accessing the queues as soon as they run */
112     threads.reserve(nr_threads);
113     for (size_t i = 0; i < nr_threads; i++)
114         threads.emplace_back(*this, 0, !is_synchronous());
115 };
116 
117 // ok, if an exception is raised we'll die abruptly.
118 // coverity[exn_spec_violation]
~thread_pool()119 thread_pool::~thread_pool() {
120   drain_all_queues();
121   enter();
122   kill_threads = true;
123   for (auto & T : tasks) broadcast(T.not_empty); /* Wakey wakey, time to die */
124   leave();
125   drain_all_queues();
126   threads.clear();      /* does pthread_join */
127   for (auto const & T : tasks) ASSERT_ALWAYS_NOTHROW(T.empty());
128   for (auto const & R : results) ASSERT_ALWAYS_NOTHROW(R.empty());
129   for (auto const & E : exceptions) ASSERT_ALWAYS_NOTHROW(E.empty());
130   store_wait_time += cumulated_wait_time;
131 }
132 
133 void *
thread_work_on_tasks_static(void * arg)134 thread_pool::thread_work_on_tasks_static(void *arg)
135 {
136     worker_thread *I = static_cast<worker_thread *>(arg);
137     I->pool.thread_work_on_tasks(*I);
138     return NULL;
139 }
140 
141 void
thread_work_on_tasks(worker_thread & I)142 thread_pool::thread_work_on_tasks(worker_thread & I)
143 {
144   /* we removed the per-thread timer, because that goes in the way
145    * of our intent to make threads more special-q agnostic: timers are
146    * attached to the nfs_aux structure, now. This implies that all
147    * routines that are called as worker threads must activate their timer
148    * on entry.
149    *
150    */
151   ASSERT_ALWAYS(!is_synchronous());
152   double tt = -seconds_thread();
153   while (1) {
154       size_t queue = I.preferred_queue;
155       thread_task task = get_task(queue);
156       if (task.is_terminal())
157           break;
158       try {
159           tt += seconds_thread();
160           task_result *result = task(&I);
161           tt -= seconds_thread();
162           if (result != NULL)
163               add_result(queue, result);
164       } catch (clonable_exception const& e) {
165           tt -= seconds_thread();
166           add_exception(queue, e.clone());
167           /* We need to wake the listener... */
168           add_result(queue, NULL);
169       }
170   }
171   tt += seconds_thread();
172   /* tt is now the wall-clock time spent really within this function,
173    * waiting for mutexes and condition variables...  */
174   std::lock_guard<std::mutex> dummy(mm_cumulated_wait_time);
175   cumulated_wait_time += tt;
176 }
177 
178 bool
all_task_queues_empty() const179 thread_pool::all_task_queues_empty() const
180 {
181   for (auto const & T : tasks)
182     if (!T.empty()) return false;
183   return true;
184 }
185 
186 
187 void
add_task(task_function_t func,task_parameters * params,const int id,const size_t queue,double cost)188 thread_pool::add_task(task_function_t func, task_parameters * params,
189         const int id, const size_t queue, double cost)
190 {
191     if (is_synchronous()) {
192         /* Execute the function right away, simulate the action of a
193          * secondary thread fetching it from the task queue */
194         created[queue]++;
195         try {
196             task_result *result = func(threads.data(), params, id);
197             if (result != NULL)
198                 results[queue].push(result);
199         } catch (clonable_exception const& e) {
200             exceptions[queue].push(e.clone());
201             /* We do this in the asynchronous case. It isn't clear that
202              * we need to do the same in the syncronous case. */
203             results[queue].push(NULL);
204         }
205         return;
206     }
207     ASSERT_ALWAYS(queue < tasks.size());
208     enter();
209     ASSERT_ALWAYS(!kill_threads);
210     tasks[queue].push(thread_task(func, params, id, cost));
211     created[queue]++;
212 
213     /* Find a queue with waiting threads, starting with "queue" */
214     size_t i = queue;
215     if (tasks[i].nr_threads_waiting == 0) {
216       for (i = 0; i < tasks.size() && tasks[i].nr_threads_waiting == 0; i++) {}
217     }
218     /* If any queue with waiting threads was found, wake up one of them */
219     if (i < tasks.size())
220       signal(tasks[i].not_empty);
221     leave();
222 }
223 
224 thread_task
get_task(size_t & preferred_queue)225 thread_pool::get_task(size_t& preferred_queue)
226 {
227   ASSERT(!is_synchronous());
228   enter();
229   while (!kill_threads && all_task_queues_empty()) {
230     /* No work -> tell this thread to wait until work becomes available.
231        We also leave the loop when the thread needs to die.
232        The while() protects against spurious wake-ups that can fire even if
233        the queue is still empty. */
234     tasks[preferred_queue].nr_threads_waiting++;
235     wait(tasks[preferred_queue].not_empty);
236     tasks[preferred_queue].nr_threads_waiting--;
237   }
238   thread_task task(true);
239   if (kill_threads && all_task_queues_empty()) {
240       /* then the default object above is appropriate for signaling all
241        * workers so that they terminate.
242        */
243   } else {
244     /* Find a non-empty task queue, starting with the preferred one */
245     size_t& i(preferred_queue);
246     if (tasks[i].empty()) {
247       for (i = 0; i < tasks.size() && tasks[i].empty(); i++) {}
248     }
249     /* There must have been a non-empty queue or we'd still be in the while()
250        loop above */
251     ASSERT_ALWAYS(i < tasks.size());
252     task = std::move(tasks[i].top());
253     tasks[i].pop();
254   }
255   leave();
256   return task;
257 }
258 
259 void
add_result(const size_t queue,task_result * const result)260 thread_pool::add_result(const size_t queue, task_result *const result) {
261   ASSERT(!is_synchronous());       // synchronous case: see add_task
262   ASSERT_ALWAYS(queue < results.size());
263   enter();
264   results[queue].push(result);
265   signal(results[queue].not_empty);
266   leave();
267 }
268 
269 void
add_exception(const size_t queue,clonable_exception * e)270 thread_pool::add_exception(const size_t queue, clonable_exception * e) {
271   ASSERT(!is_synchronous());       // synchronous case: see add_task
272   ASSERT_ALWAYS(queue < results.size());
273   enter();
274   exceptions[queue].push(e);
275   // do we use it ?
276   signal(results[queue].not_empty);
277   leave();
278 }
279 
280 /* Get a result from the specified results queue. If no result is available,
281    waits with blocking=true, and returns NULL with blocking=false. */
282 task_result *
get_result(const size_t queue,const bool blocking)283 thread_pool::get_result(const size_t queue, const bool blocking) {
284   task_result *result;
285   ASSERT_ALWAYS(queue < results.size());
286 
287   /* works both in synchronous and non-synchronous case */
288   enter();
289   if (!blocking and results[queue].empty()) {
290     result = NULL;
291   } else {
292     while (results[queue].empty())
293       wait(results[queue].not_empty);
294     result = results[queue].front();
295     results[queue].pop();
296     joined[queue]++;
297   }
298   leave();
299   return result;
300 }
301 
drain_queue(const size_t queue,void (* f)(task_result *))302 void thread_pool::drain_queue(const size_t queue, void (*f)(task_result*))
303 {
304     /* works both in synchronous and non-synchronous case */
305     enter();
306     for(size_t cr = created[queue]; joined[queue] < cr ; ) {
307         while (results[queue].empty())
308             wait(results[queue].not_empty);
309         task_result * result = results[queue].front();
310         results[queue].pop();
311         joined[queue]++;
312         if (f) f(result);
313         delete result;
314     }
315     leave();
316 }
drain_all_queues()317 void thread_pool::drain_all_queues()
318 {
319     for(size_t queue = 0 ; queue < results.size() ; ++queue) {
320         drain_queue(queue);
321     }
322 }
323 
324 /* get an exception from the specified exceptions queue. This is
325  * obviously non-blocking, because exceptions are exceptional. So when no
326  * exception is there, we return NULL. When there is one, we return a
327  * pointer to a newly allocated copy of it.
328  */
get_exception(const size_t queue)329 clonable_exception * thread_pool::get_exception(const size_t queue) {
330     clonable_exception * e = NULL;
331     ASSERT_ALWAYS(queue < exceptions.size());
332     /* works both in synchronous and non-synchronous case */
333     enter();
334     if (!exceptions[queue].empty()) {
335         e = exceptions[queue].front();
336         exceptions[queue].pop();
337     }
338     leave();
339     return e;
340 }
341