1 /* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3  * This file is part of openfx-supportext <https://github.com/devernay/openfx-supportext>,
4  * Copyright (C) 2013-2018 INRIA
5  *
6  * openfx-supportext is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * openfx-supportext is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with openfx-supportext.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>
18  * ***** END LICENSE BLOCK ***** */
19 
20 /*
21  * A plugin-side multithread suite.
22  * Can be used in place of a faulty or missing host MultiThread Suite.
23  *
24  * This suite counts the number of running threads lauched by this suite only, and reports the number of free slots in multiThreadNumCPUs.
25  *
26  * The number of free slots is shared between all plugins of a multibundle.
27  */
28 
29 //#define DEBUG_STDOUT // output debug messages to stdout
30 
31 #include "ofxsThreadSuite.h"
32 #include "ofxsMultiThread.h"
33 
34 #include <cassert>
35 #include <vector>
36 #include <map>
37 #ifdef DEBUG_STDOUT
38 #include <iostream>
39 #define DBG(x) (x)
40 #else
41 #define DBG(x) (void)0
42 #endif
43 
44 // use TinyThread 1.2 from https://gitorious.org/tinythread/tinythreadpp
45 // for portable C++11-like threads
46 #include "tinythread.h"
47 // use our version of fast_mutex.h, which has bug fixes
48 //#include "fast_mutex.h"
49 
50 #include "ofxCore.h"
51 #include "ofxMultiThread.h"
52 #include "ofxsImageEffect.h"
53 
54 using namespace tthread;
55 using std::map;
56 using std::vector;
57 #ifdef DEBUG_STDOUT
58 using std::cout;
59 using std::endl;
60 #endif
61 
62 namespace {
63 
64 template<typename T>
65 static inline void
unused(const T &)66 unused(const T&) {}
67 
68 OfxStatus multiThreadNumCPUs(unsigned int *nCPUs);
69 
70 const unsigned nprocs = thread::hardware_concurrency();
71 
72 mutex occupancyLock; // protects occupancy
73 unsigned occupancy = 0;
74 
75 mutex threadIndexesLock; // protects threadIndexes
76 map<thread::id, unsigned int> threadIndexes;
77 
78 
79 struct ThreadArgs
80 {
81     OfxThreadFunctionV1* func;
82     unsigned int threadIndex;
83     unsigned int threadMax;
84     void *customArg;
85     OfxStatus ret;
86 };
87 
88 void
threadFunction(void * _args)89 threadFunction(void *_args)
90 {
91     ThreadArgs* args = (ThreadArgs*)_args;
92     assert(args->threadIndex < args->threadMax);
93 
94     args->ret = kOfxStatOK;
95     try {
96         args->func(args->threadIndex, args->threadMax, args->customArg);
97     } catch (const std::bad_alloc) {
98         args->ret = kOfxStatErrMemory;
99     } catch (...) {
100         args->ret = kOfxStatFailed;
101     }
102 }
103 
104 /**@brief Function to spawn SMP threads
105 
106  \arg func The function to call in each thread.
107  \arg nThreads The number of threads to launch
108  \arg customArg The paramter to pass to customArg of func in each thread.
109 
110  This function will spawn nThreads separate threads of computation (typically one per CPU)
111  to allow something to perform symmetric multi processing. Each thread will call 'func' passing
112  in the index of the thread and the number of threads actually launched.
113 
114  multiThread will not return until all the spawned threads have returned. It is up to the host
115  how it waits for all the threads to return (busy wait, blocking, whatever).
116 
117  \e nThreads can be more than the value returned by multiThreadNumCPUs, however the threads will
118  be limitted to the number of CPUs returned by multiThreadNumCPUs.
119 
120  This function cannot be called recursively.
121 
122  @returns
123  - ::kOfxStatOK, the function func has executed and returned sucessfully
124  - ::kOfxStatFailed, the threading function failed to launch
125  - ::kOfxStatErrExists, failed in an attempt to call multiThread recursively,
126 
127  */
128 // Note that the thread indexes are from 0 to nThreads-1.
129 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThread
multiThread(OfxThreadFunctionV1 func,unsigned int nThreads,void * customArg)130 OfxStatus multiThread(OfxThreadFunctionV1 func,
131                       unsigned int nThreads,
132                       void *customArg)
133 {
134     if (!func) {
135         return kOfxStatFailed;
136     }
137 
138     // check if this is a spawned thread, if yes return kOfxStatErrExists
139     {
140         lock_guard<mutex> guard(threadIndexesLock);
141         if ( threadIndexes.find( this_thread::get_id() ) != threadIndexes.end() ) {
142             return kOfxStatErrExists;
143         }
144     }
145 
146     unsigned int maxConcurrentThread;
147     OfxStatus st = multiThreadNumCPUs(&maxConcurrentThread);
148     if (st != kOfxStatOK) {
149         return st;
150     }
151 
152     // from the documentation:
153     // "nThreads can be more than the value returned by multiThreadNumCPUs, however
154     // the threads will be limitted to the number of CPUs returned by multiThreadNumCPUs."
155 
156     if ( (nThreads == 1) || (maxConcurrentThread <= 1) ) {
157         int retval;
158         {
159             lock_guard<mutex> guard(occupancyLock);
160             ++occupancy;
161         }
162         try {
163             for (unsigned int i = 0; i < nThreads; ++i) {
164                 func(i, nThreads, customArg);
165             }
166 
167             retval = kOfxStatOK;
168         } catch (...) {
169             retval = kOfxStatFailed;
170         }
171         {
172             lock_guard<mutex> guard(occupancyLock);
173             --occupancy;
174         }
175         return retval;
176     }
177 
178     // at most maxConcurrentThread should be running at the same time
179     vector<thread*> threads(nThreads, NULL);
180     vector<thread::id> threadIDs(nThreads);
181     vector<ThreadArgs> threadArgs(nThreads);
182     for (unsigned int i = 0; i < nThreads; ++i) {
183         threadArgs[i].func = func;
184         threadArgs[i].threadIndex = i;
185         threadArgs[i].threadMax = nThreads;
186         threadArgs[i].customArg = customArg;
187         threadArgs[i].ret = kOfxStatFailed;
188     }
189     unsigned int i = 0; // index of next thread to launch
190     unsigned int running = 0; // number of running threads
191     unsigned int j = 0; // index of first running thread. all threads before this one are finished running
192     while (j < nThreads) {
193         // have no more than maxConcurrentThread threads launched at the same time
194         int threadsStarted = 0;
195         {
196             lock_guard<mutex> guard(threadIndexesLock);
197 
198             while (i < nThreads && running < maxConcurrentThread) {
199                 threads[i] = new thread(threadFunction, &threadArgs[i]);
200                 threadIDs[i] = threads[i]->get_id();
201                 assert( threadIndexes.find(threadIDs[i]) == threadIndexes.end() );
202                 threadIndexes[threadIDs[i]] = threadArgs[i].threadIndex;
203                 ++i;
204                 ++running;
205                 ++threadsStarted;
206             }
207         }
208 
209         ///We just started threadsStarted threads
210         {
211             lock_guard<mutex> guard(occupancyLock);
212             occupancy += threadsStarted;
213         }
214 
215         // now we've got at most maxConcurrentThread running. wait for each thread and launch a new one
216         threads[j]->join();
217         assert( !threads[j]->joinable() );
218         {
219             lock_guard<mutex> guard(threadIndexesLock);
220             map<thread::id, unsigned int>::iterator it = threadIndexes.find(threadIDs[j]);
221             assert( it != threadIndexes.end() );
222             if ( it != threadIndexes.end() ) {
223                 threadIndexes.erase(it);
224             }
225         }
226         delete threads[j];
227         threads[j] = NULL;
228         threadIDs[j] = thread::id();
229         ++j;
230         --running;
231 
232         // We just stopped 1 thread
233         {
234             lock_guard<mutex> guard(occupancyLock);
235             --occupancy;
236         }
237     }
238     assert(running == 0);
239 
240     // check the return status of each thread, return the first error found
241     for (unsigned int i = 0; i < nThreads; ++i) {
242         OfxStatus stat = threadArgs[i].ret;
243         if (stat != kOfxStatOK) {
244             return stat;
245         }
246     }
247 
248     return kOfxStatOK;
249 }
250 
251 /**@brief Function which indicates the number of CPUs available for SMP processing
252 
253  \arg nCPUs pointer to an integer where the result is returned
254 
255  This value may be less than the actual number of CPUs on a machine, as the host may reserve other CPUs for itself.
256 
257  @returns
258  - ::kOfxStatOK, all was OK and the maximum number of threads is in nThreads.
259  - ::kOfxStatFailed, the function failed to get the number of CPUs
260  */
261 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThreadNumCPUs
multiThreadNumCPUs(unsigned int * nCPUs)262 OfxStatus multiThreadNumCPUs(unsigned int *nCPUs)
263 {
264     lock_guard<mutex> guard(occupancyLock);
265     *nCPUs = occupancy >= nprocs ? 1 : (nprocs - occupancy);
266     DBG(std::cout << "numCPUs=" << *nCPUs << endl);
267     return kOfxStatOK;
268 }
269 
270 /**@brief Function which indicates the index of the current thread
271 
272  \arg threadIndex  pointer to an integer where the result is returned
273 
274  This function returns the thread index, which is the same as the \e threadIndex argument passed to the ::OfxThreadFunctionV1.
275 
276  If there are no threads currently spawned, then this function will set threadIndex to 0
277 
278  @returns
279  - ::kOfxStatOK, all was OK and the maximum number of threads is in nThreads.
280  - ::kOfxStatFailed, the function failed to return an index
281  */
282 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThreadIndex
283 // Note that the thread indexes are from 0 to nThreads-1, so a return value of 0 does not mean that it's not a spawned thread
284 // (use multiThreadIsSpawnedThread() to check if it's a spawned thread)
multiThreadIndex(unsigned int * threadIndex)285 OfxStatus multiThreadIndex(unsigned int *threadIndex)
286 {
287     if (!threadIndex) {
288         return kOfxStatFailed;
289     }
290 
291     lock_guard<mutex> guard(threadIndexesLock);
292     map<thread::id, unsigned int>::const_iterator it = threadIndexes.find( this_thread::get_id() );
293     if ( it != threadIndexes.end() ) {
294         *threadIndex = it->second;
295     } else {
296         *threadIndex = 0;
297     }
298 
299     return kOfxStatOK;
300 }
301 
302 /**@brief Function to enquire if the calling thread was spawned by multiThread
303 
304  @returns
305  - 0 if the thread is not one spawned by multiThread
306  - 1 if the thread was spawned by multiThread
307  */
308 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_multiThreadIsSpawnedThread
multiThreadIsSpawnedThread(void)309 int multiThreadIsSpawnedThread(void)
310 {
311     lock_guard<mutex> guard(threadIndexesLock);
312     return threadIndexes.find( this_thread::get_id() ) != threadIndexes.end();
313 }
314 
315 /** @brief Create a mutex
316 
317  \arg mutex - where the new handle is returned
318  \arg count - initial lock count on the mutex. This can be negative.
319 
320  Creates a new mutex with lockCount locks on the mutex intially set.
321 
322  @returns
323  - kOfxStatOK - mutex is now valid and ready to go
324  */
325 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexCreate
mutexCreate(OfxMutexHandle * mutex,int lockCount)326 OfxStatus mutexCreate(OfxMutexHandle *mutex, int lockCount)
327 {
328     if (!mutex) {
329         return kOfxStatFailed;
330     }
331 
332     // suite functions should not throw
333     try {
334         recursive_mutex* m = new recursive_mutex();
335         for (int i = 0; i < lockCount; ++i) {
336             m->lock();
337         }
338         *mutex = (OfxMutexHandle)(m);
339 
340         return kOfxStatOK;
341     } catch (std::bad_alloc) {
342         DBG(cout << "mutexCreate(): memory error.\n");
343 
344         return kOfxStatErrMemory;
345     } catch (const std::exception & e) {
346         DBG(cout << "mutexCreate(): " << e.what() << endl);
347         unused(e);
348 
349         return kOfxStatErrUnknown;
350     } catch (...) {
351         DBG(cout << "mutexCreate(): unknown error.\n");
352 
353         return kOfxStatErrUnknown;
354     }
355 }
356 
357 /** @brief Destroy a mutex
358 
359  Destroys a mutex intially created by mutexCreate.
360 
361  @returns
362  - kOfxStatOK - if it destroyed the mutex
363  - kOfxStatErrBadHandle - if the handle was bad
364  */
365 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexDestroy
mutexDestroy(const OfxMutexHandle mutex)366 OfxStatus mutexDestroy(const OfxMutexHandle mutex)
367 {
368     if (mutex == 0) {
369         return kOfxStatErrBadHandle;
370     }
371     // suite functions should not throw
372     try {
373         delete reinterpret_cast<recursive_mutex*>(mutex);
374 
375         return kOfxStatOK;
376     } catch (std::bad_alloc) {
377         DBG(cout << "mutexDestroy(): memory error.\n");
378 
379         return kOfxStatErrMemory;
380     } catch (const std::exception & e) {
381         DBG(cout << "mutexDestroy(): " << e.what() << endl);
382         unused(e);
383 
384         return kOfxStatErrUnknown;
385     } catch (...) {
386         DBG(cout << "mutexDestroy(): unknown error.\n");
387 
388         return kOfxStatErrUnknown;
389     }
390 }
391 
392 /** @brief Blocking lock on the mutex
393 
394  This trys to lock a mutex and blocks the thread it is in until the lock suceeds.
395 
396  A sucessful lock causes the mutex's lock count to be increased by one and to block any other calls to lock the mutex until it is unlocked.
397 
398  @returns
399  - kOfxStatOK - if it got the lock
400  - kOfxStatErrBadHandle - if the handle was bad
401  */
402 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexLock
mutexLock(const OfxMutexHandle mutex)403 OfxStatus mutexLock(const OfxMutexHandle mutex)
404 {
405     if (mutex == 0) {
406         return kOfxStatErrBadHandle;
407     }
408     // suite functions should not throw
409     try {
410         reinterpret_cast<recursive_mutex*>(mutex)->lock();
411 
412         return kOfxStatOK;
413     } catch (std::bad_alloc) {
414         DBG(cout << "mutexLock(): memory error.\n");
415 
416         return kOfxStatErrMemory;
417     } catch (const std::exception & e) {
418         DBG(cout << "mutexLock(): " << e.what() << endl);
419         unused(e);
420 
421         return kOfxStatErrUnknown;
422     } catch (...) {
423         DBG(cout << "mutexLock(): unknown error.\n");
424 
425         return kOfxStatErrUnknown;
426     }
427 }
428 
429 /** @brief Unlock the mutex
430 
431  This  unlocks a mutex. Unlocking a mutex decreases its lock count by one.
432 
433  @returns
434  - kOfxStatOK if it released the lock
435  - kOfxStatErrBadHandle if the handle was bad
436  */
437 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexUnLock
mutexUnLock(const OfxMutexHandle mutex)438 OfxStatus mutexUnLock(const OfxMutexHandle mutex)
439 {
440     if (mutex == 0) {
441         return kOfxStatErrBadHandle;
442     }
443     // suite functions should not throw
444     try {
445         reinterpret_cast<recursive_mutex*>(mutex)->unlock();
446 
447         return kOfxStatOK;
448     } catch (std::bad_alloc) {
449         DBG(cout << "mutexUnLock(): memory error.\n");
450 
451         return kOfxStatErrMemory;
452     } catch (const std::exception & e) {
453         DBG(cout << "mutexUnLock(): " << e.what() << endl);
454         unused(e);
455 
456         return kOfxStatErrUnknown;
457     } catch (...) {
458         DBG(cout << "mutexUnLock(): unknown error.\n");
459 
460         return kOfxStatErrUnknown;
461     }
462 }
463 
464 /** @brief Non blocking attempt to lock the mutex
465 
466  This attempts to lock a mutex, if it cannot, it returns and says so, rather than blocking.
467 
468  A sucessful lock causes the mutex's lock count to be increased by one, if the lock did not suceed, the call returns immediately and the lock count remains unchanged.
469 
470  @returns
471  - kOfxStatOK - if it got the lock
472  - kOfxStatFailed - if it did not get the lock
473  - kOfxStatErrBadHandle - if the handle was bad
474  */
475 // http://openfx.sourceforge.net/Documentation/1.3/ofxProgrammingReference.html#OfxMultiThreadSuiteV1_mutexTryLock
mutexTryLock(const OfxMutexHandle mutex)476 OfxStatus mutexTryLock(const OfxMutexHandle mutex)
477 {
478     if (mutex == 0) {
479         return kOfxStatErrBadHandle;
480     }
481     // suite functions should not throw
482     try {
483         if ( reinterpret_cast<recursive_mutex*>(mutex)->try_lock() ) {
484             return kOfxStatOK;
485         } else {
486             return kOfxStatFailed;
487         }
488     } catch (std::bad_alloc) {
489         DBG(cout << "mutexTryLock(): memory error.\n");
490 
491         return kOfxStatErrMemory;
492     } catch (const std::exception & e) {
493         DBG(cout << "mutexTryLock(): " << e.what() << endl);
494         unused(e);
495 
496         return kOfxStatErrUnknown;
497     } catch (...) {
498         DBG(cout << "mutexTryLock(): unknown error.\n");
499 
500         return kOfxStatErrUnknown;
501     }
502 }
503 
504 OfxMultiThreadSuiteV1 threadSuite = {
505     multiThread,
506     multiThreadNumCPUs,
507     multiThreadIndex,
508     multiThreadIsSpawnedThread,
509     mutexCreate,
510     mutexDestroy,
511     mutexLock,
512     mutexUnLock,
513     mutexTryLock
514 };
515 
516 OfxMultiThreadSuiteV1 mutexSuite = {
517     NULL,
518     NULL,
519     NULL,
520     NULL,
521     mutexCreate,
522     mutexDestroy,
523     mutexLock,
524     mutexUnLock,
525     mutexTryLock
526 };
527 
528 } // namespace {
529 
530 namespace OFX {
531 
532 extern ImageEffectHostDescription gHostDescription;
533 namespace Private {
534     /** @brief Pointer to the plugin-side threading suite, can be used to replace gThreadSuite */
535     OfxMultiThreadSuiteV1 *gPluginThreadSuite = &threadSuite;
536     extern int gLoadCount;
537     extern OfxMultiThreadSuiteV1 *gThreadSuite;
538 }
539 
ofxsThreadSuiteCheck()540 void ofxsThreadSuiteCheck()
541 {
542     if (Private::gThreadSuite == NULL) {
543         DBG(cout << "ofxsThreadSuiteCheck(): host has no thread suite.\n");
544     } else if (Private::gThreadSuite->multiThread == NULL) {
545         DBG(cout << "ofxsThreadSuiteCheck(): multiThread is NULL.\n");
546     } else if (Private::gThreadSuite->multiThreadNumCPUs == NULL) {
547         DBG(cout << "ofxsThreadSuiteCheck(): multiThreadNumCPUs is NULL.\n");
548     } else if (Private::gThreadSuite->multiThreadIndex == NULL) {
549         DBG(cout << "ofxsThreadSuiteCheck(): multiThreadIndex is NULL.\n");
550     } else if (Private::gThreadSuite->multiThreadIsSpawnedThread == NULL) {
551         DBG(cout << "ofxsThreadSuiteCheck(): multiThreadIsSpawnedThread is NULL.\n");
552     } else if (Private::gThreadSuite->mutexCreate == NULL) {
553         DBG(cout << "ofxsThreadSuiteCheck(): mutexCreate is NULL.\n");
554     } else if (Private::gThreadSuite->mutexDestroy == NULL) {
555         DBG(cout << "ofxsThreadSuiteCheck(): mutexDestroy is NULL.\n");
556     } else if (Private::gThreadSuite->mutexLock == NULL) {
557         DBG(cout << "ofxsThreadSuiteCheck(): mutexLock is NULL.\n");
558     } else if (Private::gThreadSuite->mutexUnLock == NULL) {
559         DBG(cout << "ofxsThreadSuiteCheck(): mutexUnLock is NULL.\n");
560     } else if (Private::gThreadSuite->mutexTryLock == NULL) {
561         DBG(cout << "ofxsThreadSuiteCheck(): mutexTryLock is NULL.\n");
562     }
563     // do it even if gLoadCount > 1. the load action is never multithreaded anyway
564     if (Private::gThreadSuite != &threadSuite &&
565         (Private::gThreadSuite == NULL ||
566          Private::gThreadSuite->multiThread == NULL ||
567          Private::gThreadSuite->multiThreadNumCPUs == NULL ||
568          Private::gThreadSuite->multiThreadIndex == NULL ||
569          Private::gThreadSuite->multiThreadIsSpawnedThread == NULL ||
570          gHostDescription.hostName.compare(0, 14, "DaVinciResolve") == 0)) { // Resolve has a dummy MT Suite, use our own
571         DBG(cout << "ofxsThreadSuiteCheck(): replacing host suite.\n");
572         Private::gThreadSuite = &threadSuite;
573     }
574     if (Private::gThreadSuite != &mutexSuite &&
575         (Private::gThreadSuite->mutexCreate == NULL ||
576          Private::gThreadSuite->mutexDestroy == NULL ||
577          Private::gThreadSuite->mutexLock == NULL ||
578          Private::gThreadSuite->mutexUnLock == NULL ||
579          Private::gThreadSuite->mutexTryLock == NULL ||
580          gHostDescription.hostName.compare(0, 22, "com.sony.Catalyst.Edit") == 0)) { // Sony Catalyst Edit (as of version  2015.15) misses the mutex functions
581         DBG(cout << "ofxsThreadSuiteCheck(): replacing host mutex suite.\n");
582         mutexSuite.multiThread = Private::gThreadSuite->multiThread;
583         mutexSuite.multiThreadNumCPUs = Private::gThreadSuite->multiThreadNumCPUs;
584         mutexSuite.multiThreadIndex = Private::gThreadSuite->multiThreadIndex;
585         mutexSuite.multiThreadIsSpawnedThread = Private::gThreadSuite->multiThreadIsSpawnedThread;
586         Private::gThreadSuite = &mutexSuite;
587     }
588 }
589 
590 } // namespace OFX
591 
592 
593