1 /*
2  * This program is free software; you can redistribute it and/or
3  * modify it under the terms of the GNU General Public License
4  * as published by the Free Software Foundation; either version 2
5  * of the License, or (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software Foundation,
14  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
15  */
16 
17 /** \file
18  * \ingroup bli
19  *
20  * Task pool to run tasks in parallel.
21  */
22 
23 #include <memory>
24 #include <stdlib.h>
25 #include <utility>
26 
27 #include "MEM_guardedalloc.h"
28 
29 #include "DNA_listBase.h"
30 
31 #include "BLI_math.h"
32 #include "BLI_mempool.h"
33 #include "BLI_task.h"
34 #include "BLI_threads.h"
35 
36 #ifdef WITH_TBB
37 /* Quiet top level deprecation message, unrelated to API usage here. */
38 #  define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
39 #  include <tbb/tbb.h>
40 #endif
41 
42 /* Task
43  *
44  * Unit of work to execute. This is a C++ class to work with TBB. */
45 
46 class Task {
47  public:
48   TaskPool *pool;
49   TaskRunFunction run;
50   void *taskdata;
51   bool free_taskdata;
52   TaskFreeFunction freedata;
53 
Task(TaskPool * pool,TaskRunFunction run,void * taskdata,bool free_taskdata,TaskFreeFunction freedata)54   Task(TaskPool *pool,
55        TaskRunFunction run,
56        void *taskdata,
57        bool free_taskdata,
58        TaskFreeFunction freedata)
59       : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata)
60   {
61   }
62 
~Task()63   ~Task()
64   {
65     if (free_taskdata) {
66       if (freedata) {
67         freedata(pool, taskdata);
68       }
69       else {
70         MEM_freeN(taskdata);
71       }
72     }
73   }
74 
75   /* Move constructor.
76    * For performance, ensure we never copy the task and only move it.
77    * For TBB version 2017 and earlier we apply a workaround to make up for
78    * the lack of move constructor support. */
Task(Task && other)79   Task(Task &&other)
80       : pool(other.pool),
81         run(other.run),
82         taskdata(other.taskdata),
83         free_taskdata(other.free_taskdata),
84         freedata(other.freedata)
85   {
86     other.pool = NULL;
87     other.run = NULL;
88     other.taskdata = NULL;
89     other.free_taskdata = false;
90     other.freedata = NULL;
91   }
92 
93 #if defined(WITH_TBB) && TBB_INTERFACE_VERSION_MAJOR < 10
Task(const Task & other)94   Task(const Task &other)
95       : pool(other.pool),
96         run(other.run),
97         taskdata(other.taskdata),
98         free_taskdata(other.free_taskdata),
99         freedata(other.freedata)
100   {
101     ((Task &)other).pool = NULL;
102     ((Task &)other).run = NULL;
103     ((Task &)other).taskdata = NULL;
104     ((Task &)other).free_taskdata = false;
105     ((Task &)other).freedata = NULL;
106   }
107 #else
108   Task(const Task &other) = delete;
109 #endif
110 
111   Task &operator=(const Task &other) = delete;
112   Task &operator=(Task &&other) = delete;
113 
114   /* Execute task. */
operator ()() const115   void operator()() const
116   {
117 #ifdef WITH_TBB
118     tbb::this_task_arena::isolate([this] { run(pool, taskdata); });
119 #else
120     run(pool, taskdata);
121 #endif
122   }
123 };
124 
125 /* TBB Task Group.
126  *
127  * Subclass since there seems to be no other way to set priority. */
128 
129 #ifdef WITH_TBB
130 class TBBTaskGroup : public tbb::task_group {
131  public:
TBBTaskGroup(TaskPriority priority)132   TBBTaskGroup(TaskPriority priority)
133   {
134 #  if TBB_INTERFACE_VERSION_MAJOR >= 12
135     /* TODO: support priorities in TBB 2021, where they are only available as
136      * part of task arenas, no longer for task groups. Or remove support for
137      * task priorities if they are no longer useful. */
138     UNUSED_VARS(priority);
139 #  else
140     switch (priority) {
141       case TASK_PRIORITY_LOW:
142         my_context.set_priority(tbb::priority_low);
143         break;
144       case TASK_PRIORITY_HIGH:
145         my_context.set_priority(tbb::priority_normal);
146         break;
147     }
148 #endif
149   }
150 
~TBBTaskGroup()151   ~TBBTaskGroup()
152   {
153   }
154 };
155 #endif
156 
157 /* Task Pool */
158 
159 typedef enum TaskPoolType {
160   TASK_POOL_TBB,
161   TASK_POOL_TBB_SUSPENDED,
162   TASK_POOL_NO_THREADS,
163   TASK_POOL_BACKGROUND,
164   TASK_POOL_BACKGROUND_SERIAL,
165 } TaskPoolType;
166 
167 struct TaskPool {
168   TaskPoolType type;
169   bool use_threads;
170 
171   ThreadMutex user_mutex;
172   void *userdata;
173 
174   /* TBB task pool. */
175 #ifdef WITH_TBB
176   TBBTaskGroup tbb_group;
177 #endif
178   volatile bool is_suspended;
179   BLI_mempool *suspended_mempool;
180 
181   /* Background task pool. */
182   ListBase background_threads;
183   ThreadQueue *background_queue;
184   volatile bool background_is_canceling;
185 };
186 
187 /* TBB Task Pool.
188  *
189  * Task pool using the TBB scheduler for tasks. When building without TBB
190  * support or running Blender with -t 1, this reverts to single threaded.
191  *
192  * Tasks may be suspended until in all are created, to make it possible to
193  * initialize data structures and create tasks in a single pass. */
194 
tbb_task_pool_create(TaskPool * pool,TaskPriority priority)195 static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority)
196 {
197   if (pool->type == TASK_POOL_TBB_SUSPENDED) {
198     pool->is_suspended = true;
199     pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER);
200   }
201 
202 #ifdef WITH_TBB
203   if (pool->use_threads) {
204     new (&pool->tbb_group) TBBTaskGroup(priority);
205   }
206 #else
207   UNUSED_VARS(priority);
208 #endif
209 }
210 
tbb_task_pool_run(TaskPool * pool,Task && task)211 static void tbb_task_pool_run(TaskPool *pool, Task &&task)
212 {
213   if (pool->is_suspended) {
214     /* Suspended task that will be executed in work_and_wait(). */
215     Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool);
216     new (task_mem) Task(std::move(task));
217 #ifdef __GNUC__
218     /* Work around apparent compiler bug where task is not properly copied
219      * to task_mem. This appears unrelated to the use of placement new or
220      * move semantics, happens even writing to a plain C struct. Rather the
221      * call into TBB seems to have some indirect effect. */
222     std::atomic_thread_fence(std::memory_order_release);
223 #endif
224   }
225 #ifdef WITH_TBB
226   else if (pool->use_threads) {
227     /* Execute in TBB task group. */
228     pool->tbb_group.run(std::move(task));
229   }
230 #endif
231   else {
232     /* Execute immediately. */
233     task();
234   }
235 }
236 
tbb_task_pool_work_and_wait(TaskPool * pool)237 static void tbb_task_pool_work_and_wait(TaskPool *pool)
238 {
239   /* Start any suspended task now. */
240   if (pool->suspended_mempool) {
241     pool->is_suspended = false;
242 
243     BLI_mempool_iter iter;
244     BLI_mempool_iternew(pool->suspended_mempool, &iter);
245     while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) {
246       tbb_task_pool_run(pool, std::move(*task));
247     }
248 
249     BLI_mempool_clear(pool->suspended_mempool);
250   }
251 
252 #ifdef WITH_TBB
253   if (pool->use_threads) {
254     /* This is called wait(), but internally it can actually do work. This
255      * matters because we don't want recursive usage of task pools to run
256      * out of threads and get stuck. */
257     pool->tbb_group.wait();
258   }
259 #endif
260 }
261 
tbb_task_pool_cancel(TaskPool * pool)262 static void tbb_task_pool_cancel(TaskPool *pool)
263 {
264 #ifdef WITH_TBB
265   if (pool->use_threads) {
266     pool->tbb_group.cancel();
267     pool->tbb_group.wait();
268   }
269 #else
270   UNUSED_VARS(pool);
271 #endif
272 }
273 
tbb_task_pool_canceled(TaskPool * pool)274 static bool tbb_task_pool_canceled(TaskPool *pool)
275 {
276 #ifdef WITH_TBB
277   if (pool->use_threads) {
278     return tbb::is_current_task_group_canceling();
279   }
280 #else
281   UNUSED_VARS(pool);
282 #endif
283 
284   return false;
285 }
286 
tbb_task_pool_free(TaskPool * pool)287 static void tbb_task_pool_free(TaskPool *pool)
288 {
289 #ifdef WITH_TBB
290   if (pool->use_threads) {
291     pool->tbb_group.~TBBTaskGroup();
292   }
293 #endif
294 
295   if (pool->suspended_mempool) {
296     BLI_mempool_destroy(pool->suspended_mempool);
297   }
298 }
299 
300 /* Background Task Pool.
301  *
302  * Fallback for running background tasks when building without TBB. */
303 
background_task_run(void * userdata)304 static void *background_task_run(void *userdata)
305 {
306   TaskPool *pool = (TaskPool *)userdata;
307   while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
308     (*task)();
309     task->~Task();
310     MEM_freeN(task);
311   }
312   return NULL;
313 }
314 
background_task_pool_create(TaskPool * pool)315 static void background_task_pool_create(TaskPool *pool)
316 {
317   pool->background_queue = BLI_thread_queue_init();
318   BLI_threadpool_init(&pool->background_threads, background_task_run, 1);
319 }
320 
background_task_pool_run(TaskPool * pool,Task && task)321 static void background_task_pool_run(TaskPool *pool, Task &&task)
322 {
323   Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__);
324   new (task_mem) Task(std::move(task));
325   BLI_thread_queue_push(pool->background_queue, task_mem);
326 
327   if (BLI_available_threads(&pool->background_threads)) {
328     BLI_threadpool_insert(&pool->background_threads, pool);
329   }
330 }
331 
background_task_pool_work_and_wait(TaskPool * pool)332 static void background_task_pool_work_and_wait(TaskPool *pool)
333 {
334   /* Signal background thread to stop waiting for new tasks if none are
335    * left, and wait for tasks and thread to finish. */
336   BLI_thread_queue_nowait(pool->background_queue);
337   BLI_thread_queue_wait_finish(pool->background_queue);
338   BLI_threadpool_clear(&pool->background_threads);
339 }
340 
background_task_pool_cancel(TaskPool * pool)341 static void background_task_pool_cancel(TaskPool *pool)
342 {
343   pool->background_is_canceling = true;
344 
345   /* Remove tasks not yet started by background thread. */
346   BLI_thread_queue_nowait(pool->background_queue);
347   while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
348     task->~Task();
349     MEM_freeN(task);
350   }
351 
352   /* Let background thread finish or cancel task it is working on. */
353   BLI_threadpool_remove(&pool->background_threads, pool);
354   pool->background_is_canceling = false;
355 }
356 
background_task_pool_canceled(TaskPool * pool)357 static bool background_task_pool_canceled(TaskPool *pool)
358 {
359   return pool->background_is_canceling;
360 }
361 
background_task_pool_free(TaskPool * pool)362 static void background_task_pool_free(TaskPool *pool)
363 {
364   background_task_pool_work_and_wait(pool);
365 
366   BLI_threadpool_end(&pool->background_threads);
367   BLI_thread_queue_free(pool->background_queue);
368 }
369 
370 /* Task Pool */
371 
task_pool_create_ex(void * userdata,TaskPoolType type,TaskPriority priority)372 static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority)
373 {
374   const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS;
375 
376   /* Background task pool uses regular TBB scheduling if available. Only when
377    * building without TBB or running with -t 1 do we need to ensure these tasks
378    * do not block the main thread. */
379   if (type == TASK_POOL_BACKGROUND && use_threads) {
380     type = TASK_POOL_TBB;
381   }
382 
383   /* Allocate task pool. */
384   TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool");
385 
386   pool->type = type;
387   pool->use_threads = use_threads;
388 
389   pool->userdata = userdata;
390   BLI_mutex_init(&pool->user_mutex);
391 
392   switch (type) {
393     case TASK_POOL_TBB:
394     case TASK_POOL_TBB_SUSPENDED:
395     case TASK_POOL_NO_THREADS:
396       tbb_task_pool_create(pool, priority);
397       break;
398     case TASK_POOL_BACKGROUND:
399     case TASK_POOL_BACKGROUND_SERIAL:
400       background_task_pool_create(pool);
401       break;
402   }
403 
404   return pool;
405 }
406 
407 /**
408  * Create a normal task pool. Tasks will be executed as soon as they are added.
409  */
BLI_task_pool_create(void * userdata,TaskPriority priority)410 TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority)
411 {
412   return task_pool_create_ex(userdata, TASK_POOL_TBB, priority);
413 }
414 
415 /**
416  * Create a background task pool.
417  * In multi-threaded context, there is no differences with #BLI_task_pool_create(),
418  * but in single-threaded case it is ensured to have at least one worker thread to run on
419  * (i.e. you don't have to call #BLI_task_pool_work_and_wait
420  * on it to be sure it will be processed).
421  *
422  * \note Background pools are non-recursive
423  * (that is, you should not create other background pools in tasks assigned to a background pool,
424  * they could end never being executed, since the 'fallback' background thread is already
425  * busy with parent task in single-threaded context).
426  */
BLI_task_pool_create_background(void * userdata,TaskPriority priority)427 TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority)
428 {
429   return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority);
430 }
431 
432 /**
433  * Similar to BLI_task_pool_create() but does not schedule any tasks for execution
434  * for until BLI_task_pool_work_and_wait() is called. This helps reducing threading
435  * overhead when pushing huge amount of small initial tasks from the main thread.
436  */
BLI_task_pool_create_suspended(void * userdata,TaskPriority priority)437 TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority)
438 {
439   return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority);
440 }
441 
442 /**
443  * Single threaded task pool that executes pushed task immediately, for
444  * debugging purposes.
445  */
BLI_task_pool_create_no_threads(void * userdata)446 TaskPool *BLI_task_pool_create_no_threads(void *userdata)
447 {
448   return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH);
449 }
450 
451 /**
452  * Task pool that executes one task after the other, possibly on different threads
453  * but never in parallel.
454  */
BLI_task_pool_create_background_serial(void * userdata,TaskPriority priority)455 TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority)
456 {
457   return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority);
458 }
459 
BLI_task_pool_free(TaskPool * pool)460 void BLI_task_pool_free(TaskPool *pool)
461 {
462   switch (pool->type) {
463     case TASK_POOL_TBB:
464     case TASK_POOL_TBB_SUSPENDED:
465     case TASK_POOL_NO_THREADS:
466       tbb_task_pool_free(pool);
467       break;
468     case TASK_POOL_BACKGROUND:
469     case TASK_POOL_BACKGROUND_SERIAL:
470       background_task_pool_free(pool);
471       break;
472   }
473 
474   BLI_mutex_end(&pool->user_mutex);
475 
476   MEM_freeN(pool);
477 }
478 
BLI_task_pool_push(TaskPool * pool,TaskRunFunction run,void * taskdata,bool free_taskdata,TaskFreeFunction freedata)479 void BLI_task_pool_push(TaskPool *pool,
480                         TaskRunFunction run,
481                         void *taskdata,
482                         bool free_taskdata,
483                         TaskFreeFunction freedata)
484 {
485   Task task(pool, run, taskdata, free_taskdata, freedata);
486 
487   switch (pool->type) {
488     case TASK_POOL_TBB:
489     case TASK_POOL_TBB_SUSPENDED:
490     case TASK_POOL_NO_THREADS:
491       tbb_task_pool_run(pool, std::move(task));
492       break;
493     case TASK_POOL_BACKGROUND:
494     case TASK_POOL_BACKGROUND_SERIAL:
495       background_task_pool_run(pool, std::move(task));
496       break;
497   }
498 }
499 
BLI_task_pool_work_and_wait(TaskPool * pool)500 void BLI_task_pool_work_and_wait(TaskPool *pool)
501 {
502   switch (pool->type) {
503     case TASK_POOL_TBB:
504     case TASK_POOL_TBB_SUSPENDED:
505     case TASK_POOL_NO_THREADS:
506       tbb_task_pool_work_and_wait(pool);
507       break;
508     case TASK_POOL_BACKGROUND:
509     case TASK_POOL_BACKGROUND_SERIAL:
510       background_task_pool_work_and_wait(pool);
511       break;
512   }
513 }
514 
BLI_task_pool_cancel(TaskPool * pool)515 void BLI_task_pool_cancel(TaskPool *pool)
516 {
517   switch (pool->type) {
518     case TASK_POOL_TBB:
519     case TASK_POOL_TBB_SUSPENDED:
520     case TASK_POOL_NO_THREADS:
521       tbb_task_pool_cancel(pool);
522       break;
523     case TASK_POOL_BACKGROUND:
524     case TASK_POOL_BACKGROUND_SERIAL:
525       background_task_pool_cancel(pool);
526       break;
527   }
528 }
529 
BLI_task_pool_current_canceled(TaskPool * pool)530 bool BLI_task_pool_current_canceled(TaskPool *pool)
531 {
532   switch (pool->type) {
533     case TASK_POOL_TBB:
534     case TASK_POOL_TBB_SUSPENDED:
535     case TASK_POOL_NO_THREADS:
536       return tbb_task_pool_canceled(pool);
537     case TASK_POOL_BACKGROUND:
538     case TASK_POOL_BACKGROUND_SERIAL:
539       return background_task_pool_canceled(pool);
540   }
541   BLI_assert("BLI_task_pool_canceled: Control flow should not come here!");
542   return false;
543 }
544 
BLI_task_pool_user_data(TaskPool * pool)545 void *BLI_task_pool_user_data(TaskPool *pool)
546 {
547   return pool->userdata;
548 }
549 
BLI_task_pool_user_mutex(TaskPool * pool)550 ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
551 {
552   return &pool->user_mutex;
553 }
554