1 /*
2 * This program is free software; you can redistribute it and/or
3 * modify it under the terms of the GNU General Public License
4 * as published by the Free Software Foundation; either version 2
5 * of the License, or (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software Foundation,
14 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
15 */
16
17 /** \file
18 * \ingroup bli
19 *
20 * Task pool to run tasks in parallel.
21 */
22
23 #include <memory>
24 #include <stdlib.h>
25 #include <utility>
26
27 #include "MEM_guardedalloc.h"
28
29 #include "DNA_listBase.h"
30
31 #include "BLI_math.h"
32 #include "BLI_mempool.h"
33 #include "BLI_task.h"
34 #include "BLI_threads.h"
35
36 #ifdef WITH_TBB
37 /* Quiet top level deprecation message, unrelated to API usage here. */
38 # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
39 # include <tbb/tbb.h>
40 #endif
41
42 /* Task
43 *
44 * Unit of work to execute. This is a C++ class to work with TBB. */
45
46 class Task {
47 public:
48 TaskPool *pool;
49 TaskRunFunction run;
50 void *taskdata;
51 bool free_taskdata;
52 TaskFreeFunction freedata;
53
Task(TaskPool * pool,TaskRunFunction run,void * taskdata,bool free_taskdata,TaskFreeFunction freedata)54 Task(TaskPool *pool,
55 TaskRunFunction run,
56 void *taskdata,
57 bool free_taskdata,
58 TaskFreeFunction freedata)
59 : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata)
60 {
61 }
62
~Task()63 ~Task()
64 {
65 if (free_taskdata) {
66 if (freedata) {
67 freedata(pool, taskdata);
68 }
69 else {
70 MEM_freeN(taskdata);
71 }
72 }
73 }
74
75 /* Move constructor.
76 * For performance, ensure we never copy the task and only move it.
77 * For TBB version 2017 and earlier we apply a workaround to make up for
78 * the lack of move constructor support. */
Task(Task && other)79 Task(Task &&other)
80 : pool(other.pool),
81 run(other.run),
82 taskdata(other.taskdata),
83 free_taskdata(other.free_taskdata),
84 freedata(other.freedata)
85 {
86 other.pool = NULL;
87 other.run = NULL;
88 other.taskdata = NULL;
89 other.free_taskdata = false;
90 other.freedata = NULL;
91 }
92
93 #if defined(WITH_TBB) && TBB_INTERFACE_VERSION_MAJOR < 10
Task(const Task & other)94 Task(const Task &other)
95 : pool(other.pool),
96 run(other.run),
97 taskdata(other.taskdata),
98 free_taskdata(other.free_taskdata),
99 freedata(other.freedata)
100 {
101 ((Task &)other).pool = NULL;
102 ((Task &)other).run = NULL;
103 ((Task &)other).taskdata = NULL;
104 ((Task &)other).free_taskdata = false;
105 ((Task &)other).freedata = NULL;
106 }
107 #else
108 Task(const Task &other) = delete;
109 #endif
110
111 Task &operator=(const Task &other) = delete;
112 Task &operator=(Task &&other) = delete;
113
114 /* Execute task. */
operator ()() const115 void operator()() const
116 {
117 #ifdef WITH_TBB
118 tbb::this_task_arena::isolate([this] { run(pool, taskdata); });
119 #else
120 run(pool, taskdata);
121 #endif
122 }
123 };
124
125 /* TBB Task Group.
126 *
127 * Subclass since there seems to be no other way to set priority. */
128
129 #ifdef WITH_TBB
130 class TBBTaskGroup : public tbb::task_group {
131 public:
TBBTaskGroup(TaskPriority priority)132 TBBTaskGroup(TaskPriority priority)
133 {
134 # if TBB_INTERFACE_VERSION_MAJOR >= 12
135 /* TODO: support priorities in TBB 2021, where they are only available as
136 * part of task arenas, no longer for task groups. Or remove support for
137 * task priorities if they are no longer useful. */
138 UNUSED_VARS(priority);
139 # else
140 switch (priority) {
141 case TASK_PRIORITY_LOW:
142 my_context.set_priority(tbb::priority_low);
143 break;
144 case TASK_PRIORITY_HIGH:
145 my_context.set_priority(tbb::priority_normal);
146 break;
147 }
148 #endif
149 }
150
~TBBTaskGroup()151 ~TBBTaskGroup()
152 {
153 }
154 };
155 #endif
156
157 /* Task Pool */
158
159 typedef enum TaskPoolType {
160 TASK_POOL_TBB,
161 TASK_POOL_TBB_SUSPENDED,
162 TASK_POOL_NO_THREADS,
163 TASK_POOL_BACKGROUND,
164 TASK_POOL_BACKGROUND_SERIAL,
165 } TaskPoolType;
166
167 struct TaskPool {
168 TaskPoolType type;
169 bool use_threads;
170
171 ThreadMutex user_mutex;
172 void *userdata;
173
174 /* TBB task pool. */
175 #ifdef WITH_TBB
176 TBBTaskGroup tbb_group;
177 #endif
178 volatile bool is_suspended;
179 BLI_mempool *suspended_mempool;
180
181 /* Background task pool. */
182 ListBase background_threads;
183 ThreadQueue *background_queue;
184 volatile bool background_is_canceling;
185 };
186
187 /* TBB Task Pool.
188 *
189 * Task pool using the TBB scheduler for tasks. When building without TBB
190 * support or running Blender with -t 1, this reverts to single threaded.
191 *
192 * Tasks may be suspended until in all are created, to make it possible to
193 * initialize data structures and create tasks in a single pass. */
194
tbb_task_pool_create(TaskPool * pool,TaskPriority priority)195 static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority)
196 {
197 if (pool->type == TASK_POOL_TBB_SUSPENDED) {
198 pool->is_suspended = true;
199 pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER);
200 }
201
202 #ifdef WITH_TBB
203 if (pool->use_threads) {
204 new (&pool->tbb_group) TBBTaskGroup(priority);
205 }
206 #else
207 UNUSED_VARS(priority);
208 #endif
209 }
210
tbb_task_pool_run(TaskPool * pool,Task && task)211 static void tbb_task_pool_run(TaskPool *pool, Task &&task)
212 {
213 if (pool->is_suspended) {
214 /* Suspended task that will be executed in work_and_wait(). */
215 Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool);
216 new (task_mem) Task(std::move(task));
217 #ifdef __GNUC__
218 /* Work around apparent compiler bug where task is not properly copied
219 * to task_mem. This appears unrelated to the use of placement new or
220 * move semantics, happens even writing to a plain C struct. Rather the
221 * call into TBB seems to have some indirect effect. */
222 std::atomic_thread_fence(std::memory_order_release);
223 #endif
224 }
225 #ifdef WITH_TBB
226 else if (pool->use_threads) {
227 /* Execute in TBB task group. */
228 pool->tbb_group.run(std::move(task));
229 }
230 #endif
231 else {
232 /* Execute immediately. */
233 task();
234 }
235 }
236
tbb_task_pool_work_and_wait(TaskPool * pool)237 static void tbb_task_pool_work_and_wait(TaskPool *pool)
238 {
239 /* Start any suspended task now. */
240 if (pool->suspended_mempool) {
241 pool->is_suspended = false;
242
243 BLI_mempool_iter iter;
244 BLI_mempool_iternew(pool->suspended_mempool, &iter);
245 while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) {
246 tbb_task_pool_run(pool, std::move(*task));
247 }
248
249 BLI_mempool_clear(pool->suspended_mempool);
250 }
251
252 #ifdef WITH_TBB
253 if (pool->use_threads) {
254 /* This is called wait(), but internally it can actually do work. This
255 * matters because we don't want recursive usage of task pools to run
256 * out of threads and get stuck. */
257 pool->tbb_group.wait();
258 }
259 #endif
260 }
261
tbb_task_pool_cancel(TaskPool * pool)262 static void tbb_task_pool_cancel(TaskPool *pool)
263 {
264 #ifdef WITH_TBB
265 if (pool->use_threads) {
266 pool->tbb_group.cancel();
267 pool->tbb_group.wait();
268 }
269 #else
270 UNUSED_VARS(pool);
271 #endif
272 }
273
tbb_task_pool_canceled(TaskPool * pool)274 static bool tbb_task_pool_canceled(TaskPool *pool)
275 {
276 #ifdef WITH_TBB
277 if (pool->use_threads) {
278 return tbb::is_current_task_group_canceling();
279 }
280 #else
281 UNUSED_VARS(pool);
282 #endif
283
284 return false;
285 }
286
tbb_task_pool_free(TaskPool * pool)287 static void tbb_task_pool_free(TaskPool *pool)
288 {
289 #ifdef WITH_TBB
290 if (pool->use_threads) {
291 pool->tbb_group.~TBBTaskGroup();
292 }
293 #endif
294
295 if (pool->suspended_mempool) {
296 BLI_mempool_destroy(pool->suspended_mempool);
297 }
298 }
299
300 /* Background Task Pool.
301 *
302 * Fallback for running background tasks when building without TBB. */
303
background_task_run(void * userdata)304 static void *background_task_run(void *userdata)
305 {
306 TaskPool *pool = (TaskPool *)userdata;
307 while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
308 (*task)();
309 task->~Task();
310 MEM_freeN(task);
311 }
312 return NULL;
313 }
314
background_task_pool_create(TaskPool * pool)315 static void background_task_pool_create(TaskPool *pool)
316 {
317 pool->background_queue = BLI_thread_queue_init();
318 BLI_threadpool_init(&pool->background_threads, background_task_run, 1);
319 }
320
background_task_pool_run(TaskPool * pool,Task && task)321 static void background_task_pool_run(TaskPool *pool, Task &&task)
322 {
323 Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__);
324 new (task_mem) Task(std::move(task));
325 BLI_thread_queue_push(pool->background_queue, task_mem);
326
327 if (BLI_available_threads(&pool->background_threads)) {
328 BLI_threadpool_insert(&pool->background_threads, pool);
329 }
330 }
331
background_task_pool_work_and_wait(TaskPool * pool)332 static void background_task_pool_work_and_wait(TaskPool *pool)
333 {
334 /* Signal background thread to stop waiting for new tasks if none are
335 * left, and wait for tasks and thread to finish. */
336 BLI_thread_queue_nowait(pool->background_queue);
337 BLI_thread_queue_wait_finish(pool->background_queue);
338 BLI_threadpool_clear(&pool->background_threads);
339 }
340
background_task_pool_cancel(TaskPool * pool)341 static void background_task_pool_cancel(TaskPool *pool)
342 {
343 pool->background_is_canceling = true;
344
345 /* Remove tasks not yet started by background thread. */
346 BLI_thread_queue_nowait(pool->background_queue);
347 while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
348 task->~Task();
349 MEM_freeN(task);
350 }
351
352 /* Let background thread finish or cancel task it is working on. */
353 BLI_threadpool_remove(&pool->background_threads, pool);
354 pool->background_is_canceling = false;
355 }
356
background_task_pool_canceled(TaskPool * pool)357 static bool background_task_pool_canceled(TaskPool *pool)
358 {
359 return pool->background_is_canceling;
360 }
361
background_task_pool_free(TaskPool * pool)362 static void background_task_pool_free(TaskPool *pool)
363 {
364 background_task_pool_work_and_wait(pool);
365
366 BLI_threadpool_end(&pool->background_threads);
367 BLI_thread_queue_free(pool->background_queue);
368 }
369
370 /* Task Pool */
371
task_pool_create_ex(void * userdata,TaskPoolType type,TaskPriority priority)372 static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority)
373 {
374 const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS;
375
376 /* Background task pool uses regular TBB scheduling if available. Only when
377 * building without TBB or running with -t 1 do we need to ensure these tasks
378 * do not block the main thread. */
379 if (type == TASK_POOL_BACKGROUND && use_threads) {
380 type = TASK_POOL_TBB;
381 }
382
383 /* Allocate task pool. */
384 TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool");
385
386 pool->type = type;
387 pool->use_threads = use_threads;
388
389 pool->userdata = userdata;
390 BLI_mutex_init(&pool->user_mutex);
391
392 switch (type) {
393 case TASK_POOL_TBB:
394 case TASK_POOL_TBB_SUSPENDED:
395 case TASK_POOL_NO_THREADS:
396 tbb_task_pool_create(pool, priority);
397 break;
398 case TASK_POOL_BACKGROUND:
399 case TASK_POOL_BACKGROUND_SERIAL:
400 background_task_pool_create(pool);
401 break;
402 }
403
404 return pool;
405 }
406
407 /**
408 * Create a normal task pool. Tasks will be executed as soon as they are added.
409 */
BLI_task_pool_create(void * userdata,TaskPriority priority)410 TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority)
411 {
412 return task_pool_create_ex(userdata, TASK_POOL_TBB, priority);
413 }
414
415 /**
416 * Create a background task pool.
417 * In multi-threaded context, there is no differences with #BLI_task_pool_create(),
418 * but in single-threaded case it is ensured to have at least one worker thread to run on
419 * (i.e. you don't have to call #BLI_task_pool_work_and_wait
420 * on it to be sure it will be processed).
421 *
422 * \note Background pools are non-recursive
423 * (that is, you should not create other background pools in tasks assigned to a background pool,
424 * they could end never being executed, since the 'fallback' background thread is already
425 * busy with parent task in single-threaded context).
426 */
BLI_task_pool_create_background(void * userdata,TaskPriority priority)427 TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority)
428 {
429 return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority);
430 }
431
432 /**
433 * Similar to BLI_task_pool_create() but does not schedule any tasks for execution
434 * for until BLI_task_pool_work_and_wait() is called. This helps reducing threading
435 * overhead when pushing huge amount of small initial tasks from the main thread.
436 */
BLI_task_pool_create_suspended(void * userdata,TaskPriority priority)437 TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority)
438 {
439 return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority);
440 }
441
442 /**
443 * Single threaded task pool that executes pushed task immediately, for
444 * debugging purposes.
445 */
BLI_task_pool_create_no_threads(void * userdata)446 TaskPool *BLI_task_pool_create_no_threads(void *userdata)
447 {
448 return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH);
449 }
450
451 /**
452 * Task pool that executes one task after the other, possibly on different threads
453 * but never in parallel.
454 */
BLI_task_pool_create_background_serial(void * userdata,TaskPriority priority)455 TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority)
456 {
457 return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority);
458 }
459
BLI_task_pool_free(TaskPool * pool)460 void BLI_task_pool_free(TaskPool *pool)
461 {
462 switch (pool->type) {
463 case TASK_POOL_TBB:
464 case TASK_POOL_TBB_SUSPENDED:
465 case TASK_POOL_NO_THREADS:
466 tbb_task_pool_free(pool);
467 break;
468 case TASK_POOL_BACKGROUND:
469 case TASK_POOL_BACKGROUND_SERIAL:
470 background_task_pool_free(pool);
471 break;
472 }
473
474 BLI_mutex_end(&pool->user_mutex);
475
476 MEM_freeN(pool);
477 }
478
BLI_task_pool_push(TaskPool * pool,TaskRunFunction run,void * taskdata,bool free_taskdata,TaskFreeFunction freedata)479 void BLI_task_pool_push(TaskPool *pool,
480 TaskRunFunction run,
481 void *taskdata,
482 bool free_taskdata,
483 TaskFreeFunction freedata)
484 {
485 Task task(pool, run, taskdata, free_taskdata, freedata);
486
487 switch (pool->type) {
488 case TASK_POOL_TBB:
489 case TASK_POOL_TBB_SUSPENDED:
490 case TASK_POOL_NO_THREADS:
491 tbb_task_pool_run(pool, std::move(task));
492 break;
493 case TASK_POOL_BACKGROUND:
494 case TASK_POOL_BACKGROUND_SERIAL:
495 background_task_pool_run(pool, std::move(task));
496 break;
497 }
498 }
499
BLI_task_pool_work_and_wait(TaskPool * pool)500 void BLI_task_pool_work_and_wait(TaskPool *pool)
501 {
502 switch (pool->type) {
503 case TASK_POOL_TBB:
504 case TASK_POOL_TBB_SUSPENDED:
505 case TASK_POOL_NO_THREADS:
506 tbb_task_pool_work_and_wait(pool);
507 break;
508 case TASK_POOL_BACKGROUND:
509 case TASK_POOL_BACKGROUND_SERIAL:
510 background_task_pool_work_and_wait(pool);
511 break;
512 }
513 }
514
BLI_task_pool_cancel(TaskPool * pool)515 void BLI_task_pool_cancel(TaskPool *pool)
516 {
517 switch (pool->type) {
518 case TASK_POOL_TBB:
519 case TASK_POOL_TBB_SUSPENDED:
520 case TASK_POOL_NO_THREADS:
521 tbb_task_pool_cancel(pool);
522 break;
523 case TASK_POOL_BACKGROUND:
524 case TASK_POOL_BACKGROUND_SERIAL:
525 background_task_pool_cancel(pool);
526 break;
527 }
528 }
529
BLI_task_pool_current_canceled(TaskPool * pool)530 bool BLI_task_pool_current_canceled(TaskPool *pool)
531 {
532 switch (pool->type) {
533 case TASK_POOL_TBB:
534 case TASK_POOL_TBB_SUSPENDED:
535 case TASK_POOL_NO_THREADS:
536 return tbb_task_pool_canceled(pool);
537 case TASK_POOL_BACKGROUND:
538 case TASK_POOL_BACKGROUND_SERIAL:
539 return background_task_pool_canceled(pool);
540 }
541 BLI_assert("BLI_task_pool_canceled: Control flow should not come here!");
542 return false;
543 }
544
BLI_task_pool_user_data(TaskPool * pool)545 void *BLI_task_pool_user_data(TaskPool *pool)
546 {
547 return pool->userdata;
548 }
549
BLI_task_pool_user_mutex(TaskPool * pool)550 ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
551 {
552 return &pool->user_mutex;
553 }
554