1 /* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3 * This file is part of openfx-supportext <https://github.com/devernay/openfx-supportext>,
4 * Copyright (C) 2013-2018 INRIA
5 *
6 * openfx-supportext is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * openfx-supportext is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with openfx-supportext. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>
18 * ***** END LICENSE BLOCK ***** */
19
20 /*
21 * A plugin-side multithread suite.
22 * Can be used in place of a faulty or missing host MultiThread Suite.
23 *
24 * This suite counts the number of running threads lauched by this suite only, and reports the number of free slots in multiThreadNumCPUs.
25 *
26 * The number of free slots is shared between all plugins of a multibundle.
27 */
28
29 //#define DEBUG_STDOUT // output debug messages to stdout
30
31 #include "ofxsThreadSuite.h"
32 #include "ofxsMultiThread.h"
33
34 #include <cassert>
35 #include <vector>
36 #include <map>
37 #ifdef DEBUG_STDOUT
38 #include <iostream>
39 #define DBG(x) (x)
40 #else
41 #define DBG(x) (void)0
42 #endif
43
44 // use TinyThread 1.2 from https://gitorious.org/tinythread/tinythreadpp
45 // for portable C++11-like threads
46 #include "tinythread.h"
47 // use our version of fast_mutex.h, which has bug fixes
48 //#include "fast_mutex.h"
49
50 #include "ofxCore.h"
51 #include "ofxMultiThread.h"
52 #include "ofxsImageEffect.h"
53
54 using namespace tthread;
55 using std::map;
56 using std::vector;
57 #ifdef DEBUG_STDOUT
58 using std::cout;
59 using std::endl;
60 #endif
61
62 namespace {
63
64 template<typename T>
65 static inline void
unused(const T &)66 unused(const T&) {}
67
68 OfxStatus multiThreadNumCPUs(unsigned int *nCPUs);
69
70 const unsigned nprocs = thread::hardware_concurrency();
71
72 mutex occupancyLock; // protects occupancy
73 unsigned occupancy = 0;
74
75 mutex threadIndexesLock; // protects threadIndexes
76 map<thread::id, unsigned int> threadIndexes;
77
78
79 struct ThreadArgs
80 {
81 OfxThreadFunctionV1* func;
82 unsigned int threadIndex;
83 unsigned int threadMax;
84 void *customArg;
85 OfxStatus ret;
86 };
87
88 void
threadFunction(void * _args)89 threadFunction(void *_args)
90 {
91 ThreadArgs* args = (ThreadArgs*)_args;
92 assert(args->threadIndex < args->threadMax);
93
94 args->ret = kOfxStatOK;
95 try {
96 args->func(args->threadIndex, args->threadMax, args->customArg);
97 } catch (const std::bad_alloc) {
98 args->ret = kOfxStatErrMemory;
99 } catch (...) {
100 args->ret = kOfxStatFailed;
101 }
102 }
103
104 /**@brief Function to spawn SMP threads
105
106 \arg func The function to call in each thread.
107 \arg nThreads The number of threads to launch
108 \arg customArg The paramter to pass to customArg of func in each thread.
109
110 This function will spawn nThreads separate threads of computation (typically one per CPU)
111 to allow something to perform symmetric multi processing. Each thread will call 'func' passing
112 in the index of the thread and the number of threads actually launched.
113
114 multiThread will not return until all the spawned threads have returned. It is up to the host
115 how it waits for all the threads to return (busy wait, blocking, whatever).
116
117 \e nThreads can be more than the value returned by multiThreadNumCPUs, however the threads will
118 be limitted to the number of CPUs returned by multiThreadNumCPUs.
119
120 This function cannot be called recursively.
121
122 @returns
123 - ::kOfxStatOK, the function func has executed and returned sucessfully
124 - ::kOfxStatFailed, the threading function failed to launch
125 - ::kOfxStatErrExists, failed in an attempt to call multiThread recursively,
126
127 */
128 // Note that the thread indexes are from 0 to nThreads-1.
129 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThread
multiThread(OfxThreadFunctionV1 func,unsigned int nThreads,void * customArg)130 OfxStatus multiThread(OfxThreadFunctionV1 func,
131 unsigned int nThreads,
132 void *customArg)
133 {
134 if (!func) {
135 return kOfxStatFailed;
136 }
137
138 // check if this is a spawned thread, if yes return kOfxStatErrExists
139 {
140 lock_guard<mutex> guard(threadIndexesLock);
141 if ( threadIndexes.find( this_thread::get_id() ) != threadIndexes.end() ) {
142 return kOfxStatErrExists;
143 }
144 }
145
146 unsigned int maxConcurrentThread;
147 OfxStatus st = multiThreadNumCPUs(&maxConcurrentThread);
148 if (st != kOfxStatOK) {
149 return st;
150 }
151
152 // from the documentation:
153 // "nThreads can be more than the value returned by multiThreadNumCPUs, however
154 // the threads will be limitted to the number of CPUs returned by multiThreadNumCPUs."
155
156 if ( (nThreads == 1) || (maxConcurrentThread <= 1) ) {
157 int retval;
158 {
159 lock_guard<mutex> guard(occupancyLock);
160 ++occupancy;
161 }
162 try {
163 for (unsigned int i = 0; i < nThreads; ++i) {
164 func(i, nThreads, customArg);
165 }
166
167 retval = kOfxStatOK;
168 } catch (...) {
169 retval = kOfxStatFailed;
170 }
171 {
172 lock_guard<mutex> guard(occupancyLock);
173 --occupancy;
174 }
175 return retval;
176 }
177
178 // at most maxConcurrentThread should be running at the same time
179 vector<thread*> threads(nThreads, NULL);
180 vector<thread::id> threadIDs(nThreads);
181 vector<ThreadArgs> threadArgs(nThreads);
182 for (unsigned int i = 0; i < nThreads; ++i) {
183 threadArgs[i].func = func;
184 threadArgs[i].threadIndex = i;
185 threadArgs[i].threadMax = nThreads;
186 threadArgs[i].customArg = customArg;
187 threadArgs[i].ret = kOfxStatFailed;
188 }
189 unsigned int i = 0; // index of next thread to launch
190 unsigned int running = 0; // number of running threads
191 unsigned int j = 0; // index of first running thread. all threads before this one are finished running
192 while (j < nThreads) {
193 // have no more than maxConcurrentThread threads launched at the same time
194 int threadsStarted = 0;
195 {
196 lock_guard<mutex> guard(threadIndexesLock);
197
198 while (i < nThreads && running < maxConcurrentThread) {
199 threads[i] = new thread(threadFunction, &threadArgs[i]);
200 threadIDs[i] = threads[i]->get_id();
201 assert( threadIndexes.find(threadIDs[i]) == threadIndexes.end() );
202 threadIndexes[threadIDs[i]] = threadArgs[i].threadIndex;
203 ++i;
204 ++running;
205 ++threadsStarted;
206 }
207 }
208
209 ///We just started threadsStarted threads
210 {
211 lock_guard<mutex> guard(occupancyLock);
212 occupancy += threadsStarted;
213 }
214
215 // now we've got at most maxConcurrentThread running. wait for each thread and launch a new one
216 threads[j]->join();
217 assert( !threads[j]->joinable() );
218 {
219 lock_guard<mutex> guard(threadIndexesLock);
220 map<thread::id, unsigned int>::iterator it = threadIndexes.find(threadIDs[j]);
221 assert( it != threadIndexes.end() );
222 if ( it != threadIndexes.end() ) {
223 threadIndexes.erase(it);
224 }
225 }
226 delete threads[j];
227 threads[j] = NULL;
228 threadIDs[j] = thread::id();
229 ++j;
230 --running;
231
232 // We just stopped 1 thread
233 {
234 lock_guard<mutex> guard(occupancyLock);
235 --occupancy;
236 }
237 }
238 assert(running == 0);
239
240 // check the return status of each thread, return the first error found
241 for (unsigned int i = 0; i < nThreads; ++i) {
242 OfxStatus stat = threadArgs[i].ret;
243 if (stat != kOfxStatOK) {
244 return stat;
245 }
246 }
247
248 return kOfxStatOK;
249 }
250
251 /**@brief Function which indicates the number of CPUs available for SMP processing
252
253 \arg nCPUs pointer to an integer where the result is returned
254
255 This value may be less than the actual number of CPUs on a machine, as the host may reserve other CPUs for itself.
256
257 @returns
258 - ::kOfxStatOK, all was OK and the maximum number of threads is in nThreads.
259 - ::kOfxStatFailed, the function failed to get the number of CPUs
260 */
261 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThreadNumCPUs
multiThreadNumCPUs(unsigned int * nCPUs)262 OfxStatus multiThreadNumCPUs(unsigned int *nCPUs)
263 {
264 lock_guard<mutex> guard(occupancyLock);
265 *nCPUs = occupancy >= nprocs ? 1 : (nprocs - occupancy);
266 DBG(std::cout << "numCPUs=" << *nCPUs << endl);
267 return kOfxStatOK;
268 }
269
270 /**@brief Function which indicates the index of the current thread
271
272 \arg threadIndex pointer to an integer where the result is returned
273
274 This function returns the thread index, which is the same as the \e threadIndex argument passed to the ::OfxThreadFunctionV1.
275
276 If there are no threads currently spawned, then this function will set threadIndex to 0
277
278 @returns
279 - ::kOfxStatOK, all was OK and the maximum number of threads is in nThreads.
280 - ::kOfxStatFailed, the function failed to return an index
281 */
282 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThreadIndex
283 // Note that the thread indexes are from 0 to nThreads-1, so a return value of 0 does not mean that it's not a spawned thread
284 // (use multiThreadIsSpawnedThread() to check if it's a spawned thread)
multiThreadIndex(unsigned int * threadIndex)285 OfxStatus multiThreadIndex(unsigned int *threadIndex)
286 {
287 if (!threadIndex) {
288 return kOfxStatFailed;
289 }
290
291 lock_guard<mutex> guard(threadIndexesLock);
292 map<thread::id, unsigned int>::const_iterator it = threadIndexes.find( this_thread::get_id() );
293 if ( it != threadIndexes.end() ) {
294 *threadIndex = it->second;
295 } else {
296 *threadIndex = 0;
297 }
298
299 return kOfxStatOK;
300 }
301
302 /**@brief Function to enquire if the calling thread was spawned by multiThread
303
304 @returns
305 - 0 if the thread is not one spawned by multiThread
306 - 1 if the thread was spawned by multiThread
307 */
308 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThreadIsSpawnedThread
multiThreadIsSpawnedThread(void)309 int multiThreadIsSpawnedThread(void)
310 {
311 lock_guard<mutex> guard(threadIndexesLock);
312 return threadIndexes.find( this_thread::get_id() ) != threadIndexes.end();
313 }
314
315 /** @brief Create a mutex
316
317 \arg mutex - where the new handle is returned
318 \arg count - initial lock count on the mutex. This can be negative.
319
320 Creates a new mutex with lockCount locks on the mutex intially set.
321
322 @returns
323 - kOfxStatOK - mutex is now valid and ready to go
324 */
325 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexCreate
mutexCreate(OfxMutexHandle * mutex,int lockCount)326 OfxStatus mutexCreate(OfxMutexHandle *mutex, int lockCount)
327 {
328 if (!mutex) {
329 return kOfxStatFailed;
330 }
331
332 // suite functions should not throw
333 try {
334 recursive_mutex* m = new recursive_mutex();
335 for (int i = 0; i < lockCount; ++i) {
336 m->lock();
337 }
338 *mutex = (OfxMutexHandle)(m);
339
340 return kOfxStatOK;
341 } catch (std::bad_alloc) {
342 DBG(cout << "mutexCreate(): memory error.\n");
343
344 return kOfxStatErrMemory;
345 } catch (const std::exception & e) {
346 DBG(cout << "mutexCreate(): " << e.what() << endl);
347 unused(e);
348
349 return kOfxStatErrUnknown;
350 } catch (...) {
351 DBG(cout << "mutexCreate(): unknown error.\n");
352
353 return kOfxStatErrUnknown;
354 }
355 }
356
357 /** @brief Destroy a mutex
358
359 Destroys a mutex intially created by mutexCreate.
360
361 @returns
362 - kOfxStatOK - if it destroyed the mutex
363 - kOfxStatErrBadHandle - if the handle was bad
364 */
365 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexDestroy
mutexDestroy(const OfxMutexHandle mutex)366 OfxStatus mutexDestroy(const OfxMutexHandle mutex)
367 {
368 if (mutex == 0) {
369 return kOfxStatErrBadHandle;
370 }
371 // suite functions should not throw
372 try {
373 delete reinterpret_cast<recursive_mutex*>(mutex);
374
375 return kOfxStatOK;
376 } catch (std::bad_alloc) {
377 DBG(cout << "mutexDestroy(): memory error.\n");
378
379 return kOfxStatErrMemory;
380 } catch (const std::exception & e) {
381 DBG(cout << "mutexDestroy(): " << e.what() << endl);
382 unused(e);
383
384 return kOfxStatErrUnknown;
385 } catch (...) {
386 DBG(cout << "mutexDestroy(): unknown error.\n");
387
388 return kOfxStatErrUnknown;
389 }
390 }
391
392 /** @brief Blocking lock on the mutex
393
394 This trys to lock a mutex and blocks the thread it is in until the lock suceeds.
395
396 A sucessful lock causes the mutex's lock count to be increased by one and to block any other calls to lock the mutex until it is unlocked.
397
398 @returns
399 - kOfxStatOK - if it got the lock
400 - kOfxStatErrBadHandle - if the handle was bad
401 */
402 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexLock
mutexLock(const OfxMutexHandle mutex)403 OfxStatus mutexLock(const OfxMutexHandle mutex)
404 {
405 if (mutex == 0) {
406 return kOfxStatErrBadHandle;
407 }
408 // suite functions should not throw
409 try {
410 reinterpret_cast<recursive_mutex*>(mutex)->lock();
411
412 return kOfxStatOK;
413 } catch (std::bad_alloc) {
414 DBG(cout << "mutexLock(): memory error.\n");
415
416 return kOfxStatErrMemory;
417 } catch (const std::exception & e) {
418 DBG(cout << "mutexLock(): " << e.what() << endl);
419 unused(e);
420
421 return kOfxStatErrUnknown;
422 } catch (...) {
423 DBG(cout << "mutexLock(): unknown error.\n");
424
425 return kOfxStatErrUnknown;
426 }
427 }
428
429 /** @brief Unlock the mutex
430
431 This unlocks a mutex. Unlocking a mutex decreases its lock count by one.
432
433 @returns
434 - kOfxStatOK if it released the lock
435 - kOfxStatErrBadHandle if the handle was bad
436 */
437 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexUnLock
mutexUnLock(const OfxMutexHandle mutex)438 OfxStatus mutexUnLock(const OfxMutexHandle mutex)
439 {
440 if (mutex == 0) {
441 return kOfxStatErrBadHandle;
442 }
443 // suite functions should not throw
444 try {
445 reinterpret_cast<recursive_mutex*>(mutex)->unlock();
446
447 return kOfxStatOK;
448 } catch (std::bad_alloc) {
449 DBG(cout << "mutexUnLock(): memory error.\n");
450
451 return kOfxStatErrMemory;
452 } catch (const std::exception & e) {
453 DBG(cout << "mutexUnLock(): " << e.what() << endl);
454 unused(e);
455
456 return kOfxStatErrUnknown;
457 } catch (...) {
458 DBG(cout << "mutexUnLock(): unknown error.\n");
459
460 return kOfxStatErrUnknown;
461 }
462 }
463
464 /** @brief Non blocking attempt to lock the mutex
465
466 This attempts to lock a mutex, if it cannot, it returns and says so, rather than blocking.
467
468 A sucessful lock causes the mutex's lock count to be increased by one, if the lock did not suceed, the call returns immediately and the lock count remains unchanged.
469
470 @returns
471 - kOfxStatOK - if it got the lock
472 - kOfxStatFailed - if it did not get the lock
473 - kOfxStatErrBadHandle - if the handle was bad
474 */
475 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexTryLock
mutexTryLock(const OfxMutexHandle mutex)476 OfxStatus mutexTryLock(const OfxMutexHandle mutex)
477 {
478 if (mutex == 0) {
479 return kOfxStatErrBadHandle;
480 }
481 // suite functions should not throw
482 try {
483 if ( reinterpret_cast<recursive_mutex*>(mutex)->try_lock() ) {
484 return kOfxStatOK;
485 } else {
486 return kOfxStatFailed;
487 }
488 } catch (std::bad_alloc) {
489 DBG(cout << "mutexTryLock(): memory error.\n");
490
491 return kOfxStatErrMemory;
492 } catch (const std::exception & e) {
493 DBG(cout << "mutexTryLock(): " << e.what() << endl);
494 unused(e);
495
496 return kOfxStatErrUnknown;
497 } catch (...) {
498 DBG(cout << "mutexTryLock(): unknown error.\n");
499
500 return kOfxStatErrUnknown;
501 }
502 }
503
504 OfxMultiThreadSuiteV1 threadSuite = {
505 multiThread,
506 multiThreadNumCPUs,
507 multiThreadIndex,
508 multiThreadIsSpawnedThread,
509 mutexCreate,
510 mutexDestroy,
511 mutexLock,
512 mutexUnLock,
513 mutexTryLock
514 };
515
516 OfxMultiThreadSuiteV1 mutexSuite = {
517 NULL,
518 NULL,
519 NULL,
520 NULL,
521 mutexCreate,
522 mutexDestroy,
523 mutexLock,
524 mutexUnLock,
525 mutexTryLock
526 };
527
528 } // namespace {
529
530 namespace OFX {
531
532 extern ImageEffectHostDescription gHostDescription;
533 namespace Private {
534 /** @brief Pointer to the plugin-side threading suite, can be used to replace gThreadSuite */
535 OfxMultiThreadSuiteV1 *gPluginThreadSuite = &threadSuite;
536 extern int gLoadCount;
537 extern OfxMultiThreadSuiteV1 *gThreadSuite;
538 }
539
ofxsThreadSuiteCheck()540 void ofxsThreadSuiteCheck()
541 {
542 if (Private::gThreadSuite == NULL) {
543 DBG(cout << "ofxsThreadSuiteCheck(): host has no thread suite.\n");
544 } else if (Private::gThreadSuite->multiThread == NULL) {
545 DBG(cout << "ofxsThreadSuiteCheck(): multiThread is NULL.\n");
546 } else if (Private::gThreadSuite->multiThreadNumCPUs == NULL) {
547 DBG(cout << "ofxsThreadSuiteCheck(): multiThreadNumCPUs is NULL.\n");
548 } else if (Private::gThreadSuite->multiThreadIndex == NULL) {
549 DBG(cout << "ofxsThreadSuiteCheck(): multiThreadIndex is NULL.\n");
550 } else if (Private::gThreadSuite->multiThreadIsSpawnedThread == NULL) {
551 DBG(cout << "ofxsThreadSuiteCheck(): multiThreadIsSpawnedThread is NULL.\n");
552 } else if (Private::gThreadSuite->mutexCreate == NULL) {
553 DBG(cout << "ofxsThreadSuiteCheck(): mutexCreate is NULL.\n");
554 } else if (Private::gThreadSuite->mutexDestroy == NULL) {
555 DBG(cout << "ofxsThreadSuiteCheck(): mutexDestroy is NULL.\n");
556 } else if (Private::gThreadSuite->mutexLock == NULL) {
557 DBG(cout << "ofxsThreadSuiteCheck(): mutexLock is NULL.\n");
558 } else if (Private::gThreadSuite->mutexUnLock == NULL) {
559 DBG(cout << "ofxsThreadSuiteCheck(): mutexUnLock is NULL.\n");
560 } else if (Private::gThreadSuite->mutexTryLock == NULL) {
561 DBG(cout << "ofxsThreadSuiteCheck(): mutexTryLock is NULL.\n");
562 }
563 // do it even if gLoadCount > 1. the load action is never multithreaded anyway
564 if (Private::gThreadSuite != &threadSuite &&
565 (Private::gThreadSuite == NULL ||
566 Private::gThreadSuite->multiThread == NULL ||
567 Private::gThreadSuite->multiThreadNumCPUs == NULL ||
568 Private::gThreadSuite->multiThreadIndex == NULL ||
569 Private::gThreadSuite->multiThreadIsSpawnedThread == NULL ||
570 gHostDescription.hostName.compare(0, 14, "DaVinciResolve") == 0)) { // Resolve has a dummy MT Suite, use our own
571 DBG(cout << "ofxsThreadSuiteCheck(): replacing host suite.\n");
572 Private::gThreadSuite = &threadSuite;
573 }
574 if (Private::gThreadSuite != &mutexSuite &&
575 (Private::gThreadSuite->mutexCreate == NULL ||
576 Private::gThreadSuite->mutexDestroy == NULL ||
577 Private::gThreadSuite->mutexLock == NULL ||
578 Private::gThreadSuite->mutexUnLock == NULL ||
579 Private::gThreadSuite->mutexTryLock == NULL ||
580 gHostDescription.hostName.compare(0, 22, "com.sony.Catalyst.Edit") == 0)) { // Sony Catalyst Edit (as of version 2015.15) misses the mutex functions
581 DBG(cout << "ofxsThreadSuiteCheck(): replacing host mutex suite.\n");
582 mutexSuite.multiThread = Private::gThreadSuite->multiThread;
583 mutexSuite.multiThreadNumCPUs = Private::gThreadSuite->multiThreadNumCPUs;
584 mutexSuite.multiThreadIndex = Private::gThreadSuite->multiThreadIndex;
585 mutexSuite.multiThreadIsSpawnedThread = Private::gThreadSuite->multiThreadIsSpawnedThread;
586 Private::gThreadSuite = &mutexSuite;
587 }
588 }
589
590 } // namespace OFX
591
592
593