1 /*
2 * Copyright (c) 2012-2015 Fredrik Mellbin
3 *
4 * This file is part of VapourSynth.
5 *
6 * VapourSynth is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * VapourSynth is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with VapourSynth; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20 
21 #include "vscore.h"
22 #include <cassert>
23 #include <bitset>
24 #ifdef VS_TARGET_CPU_X86
25 #include "x86utils.h"
26 #endif
27 
28 #if defined(HAVE_SCHED_GETAFFINITY)
29 #include <sched.h>
30 #elif defined(HAVE_CPUSET_GETAFFINITY)
31 #include <sys/param.h>
32 #include <sys/_cpuset.h>
33 #include <sys/cpuset.h>
34 #endif
35 
getNumAvailableThreads()36 int VSThreadPool::getNumAvailableThreads() {
37     int nthreads = std::thread::hardware_concurrency();
38 #ifdef _WIN32
39     DWORD_PTR pAff = 0;
40     DWORD_PTR sAff = 0;
41     BOOL res = GetProcessAffinityMask(GetCurrentProcess(), &pAff, &sAff);
42     if (res && pAff != 0) {
43         std::bitset<sizeof(sAff) * 8> b(pAff);
44         nthreads = b.count();
45     }
46 #elif defined(HAVE_SCHED_GETAFFINITY)
47     // Linux only.
48     cpu_set_t affinity;
49     if (sched_getaffinity(0, sizeof(cpu_set_t), &affinity) == 0)
50         nthreads = CPU_COUNT(&affinity);
51 #elif defined(HAVE_CPUSET_GETAFFINITY)
52     // BSD only (FreeBSD only?)
53     cpuset_t affinity;
54     if (cpuset_getaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(cpuset_t), &affinity) == 0)
55         nthreads = CPU_COUNT(&affinity);
56 #endif
57 
58     return nthreads;
59 }
60 
taskCmp(const PFrameContext & a,const PFrameContext & b)61 bool VSThreadPool::taskCmp(const PFrameContext &a, const PFrameContext &b) {
62     return (a->reqOrder < b->reqOrder) || (a->reqOrder == b->reqOrder && a->n < b->n);
63 }
64 
runTasks(VSThreadPool * owner,std::atomic<bool> & stop)65 void VSThreadPool::runTasks(VSThreadPool *owner, std::atomic<bool> &stop) {
66 #ifdef VS_TARGET_OS_WINDOWS
67     if (!vs_isSSEStateOk())
68         vsFatal("Bad SSE state detected after creating new thread");
69 #endif
70 
71     std::unique_lock<std::mutex> lock(owner->lock);
72 
73     while (true) {
74         bool ranTask = false;
75 
76 /////////////////////////////////////////////////////////////////////////////////////////////
77 // Go through all tasks from the top (oldest) and process the first one possible
78         owner->tasks.sort(taskCmp);
79 
80         for (auto iter = owner->tasks.begin(); iter != owner->tasks.end(); ++iter) {
81             FrameContext *mainContext = iter->get();
82             FrameContext *leafContext = nullptr;
83 
84 /////////////////////////////////////////////////////////////////////////////////////////////
85 // Handle the output tasks
86             if (mainContext->frameDone && mainContext->returnedFrame) {
87                 PFrameContext mainContextRef(*iter);
88                 owner->tasks.erase(iter);
89                 owner->returnFrame(mainContextRef, mainContext->returnedFrame);
90                 ranTask = true;
91                 break;
92             }
93 
94             if (mainContext->frameDone && mainContext->hasError()) {
95                 PFrameContext mainContextRef(*iter);
96                 owner->tasks.erase(iter);
97                 owner->returnFrame(mainContextRef, mainContext->getErrorMessage());
98                 ranTask = true;
99                 break;
100             }
101 
102             bool hasLeafContext = mainContext->returnedFrame || mainContext->hasError();
103             if (hasLeafContext) {
104                 leafContext = mainContext;
105                 mainContext = mainContext->upstreamContext.get();
106             }
107 
108             VSNode *clip = mainContext->clip;
109             int filterMode = clip->filterMode;
110 
111 /////////////////////////////////////////////////////////////////////////////////////////////
112 // This part handles the locking for the different filter modes
113 
114             bool parallelRequestsNeedsUnlock = false;
115             if (filterMode == fmUnordered || filterMode == fmUnorderedLinear) {
116                 // already busy?
117                 if (!clip->serialMutex.try_lock())
118                     continue;
119             } else if (filterMode == fmSerial) {
120                 // already busy?
121                 if (!clip->serialMutex.try_lock())
122                     continue;
123                 // no frame in progress?
124                 if (clip->serialFrame == -1) {
125                     clip->serialFrame = mainContext->n;
126                 // another frame already in progress?
127                 } else if (clip->serialFrame != mainContext->n) {
128                     clip->serialMutex.unlock();
129                     continue;
130                 }
131                 // continue processing the already started frame
132             } else if (filterMode == fmParallel) {
133                 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
134                 // is the filter already processing another call for this frame? if so move along
135                 if (!clip->concurrentFrames.insert(mainContext->n).second)
136                     continue;
137             } else if (filterMode == fmParallelRequests) {
138                 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
139                 // do we need the serial lock since all frames will be ready this time?
140                 // check if we're in the arAllFramesReady state so we need additional locking
141                 if (mainContext->numFrameRequests == 1) {
142                     if (!clip->serialMutex.try_lock())
143                         continue;
144                     parallelRequestsNeedsUnlock = true;
145                 } else {
146                     // is the filter already processing another call for this frame? if so move along
147                     if (!clip->concurrentFrames.insert(mainContext->n).second)
148                         continue;
149                 }
150             }
151 
152 /////////////////////////////////////////////////////////////////////////////////////////////
153 // Remove the context from the task list
154 
155             PFrameContext mainContextRef;
156             PFrameContext leafContextRef;
157             if (hasLeafContext) {
158                 leafContextRef = *iter;
159                 mainContextRef = leafContextRef->upstreamContext;
160             } else {
161                 mainContextRef = *iter;
162             }
163 
164             owner->tasks.erase(iter);
165 
166 /////////////////////////////////////////////////////////////////////////////////////////////
167 // Figure out the activation reason
168 
169             VSActivationReason ar = arInitial;
170             bool skipCall = false; // Used to avoid multiple error calls for the same frame request going into a filter
171             if ((hasLeafContext && leafContext->hasError()) || mainContext->hasError()) {
172                 ar = arError;
173                 skipCall = mainContext->setError(leafContext->getErrorMessage());
174                 --mainContext->numFrameRequests;
175             } else if (hasLeafContext && leafContext->returnedFrame) {
176                 if (--mainContext->numFrameRequests > 0)
177                     ar = arFrameReady;
178                 else
179                     ar = arAllFramesReady;
180 
181                 mainContext->availableFrames.insert(std::make_pair(NodeOutputKey(leafContext->clip, leafContext->n, leafContext->index), leafContext->returnedFrame));
182                 mainContext->lastCompletedN = leafContext->n;
183                 mainContext->lastCompletedNode = leafContext->node;
184             }
185 
186             bool hasExistingRequests = !!mainContext->numFrameRequests;
187             bool isLinear = (filterMode == fmUnorderedLinear);
188 
189 /////////////////////////////////////////////////////////////////////////////////////////////
190 // Do the actual processing
191 
192             if (!isLinear)
193                 lock.unlock();
194 
195             VSFrameContext externalFrameCtx(mainContextRef);
196             assert(ar == arError || !mainContext->hasError());
197 #ifdef VS_FRAME_REQ_DEBUG
198             vsWarning("Entering: %s Frame: %d Index: %d AR: %d Req: %d", mainContext->clip->name.c_str(), mainContext->n, mainContext->index, (int)ar, (int)mainContext->reqOrder);
199 #endif
200             PVideoFrame f;
201             if (!skipCall)
202                 f = clip->getFrameInternal(mainContext->n, ar, externalFrameCtx);
203             ranTask = true;
204 #ifdef VS_FRAME_REQ_DEBUG
205             vsWarning("Exiting: %s Frame: %d Index: %d AR: %d Req: %d", mainContext->clip->name.c_str(), mainContext->n, mainContext->index, (int)ar, (int)mainContext->reqOrder);
206 #endif
207             bool frameProcessingDone = f || mainContext->hasError();
208             if (mainContext->hasError() && f)
209                 vsFatal("A frame was returned by %s but an error was also set, this is not allowed", clip->name.c_str());
210 
211 /////////////////////////////////////////////////////////////////////////////////////////////
212 // Unlock so the next job can run on the context
213             if (filterMode == fmUnordered || filterMode == fmUnorderedLinear) {
214                 clip->serialMutex.unlock();
215             } else if (filterMode == fmSerial) {
216                 if (frameProcessingDone)
217                     clip->serialFrame = -1;
218                 clip->serialMutex.unlock();
219             } else if (filterMode == fmParallel) {
220                 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
221                 clip->concurrentFrames.erase(mainContext->n);
222             } else if (filterMode == fmParallelRequests) {
223                 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
224                 clip->concurrentFrames.erase(mainContext->n);
225                 if (parallelRequestsNeedsUnlock)
226                     clip->serialMutex.unlock();
227             }
228 
229 /////////////////////////////////////////////////////////////////////////////////////////////
230 // Handle frames that were requested
231             bool requestedFrames = !externalFrameCtx.reqList.empty() && !frameProcessingDone;
232 
233             if (!isLinear)
234                 lock.lock();
235 
236             if (requestedFrames) {
237                 for (auto &reqIter : externalFrameCtx.reqList)
238                     owner->startInternal(reqIter);
239                 externalFrameCtx.reqList.clear();
240             }
241 
242             if (frameProcessingDone)
243                 owner->allContexts.erase(NodeOutputKey(mainContext->clip, mainContext->n, mainContext->index));
244 
245 /////////////////////////////////////////////////////////////////////////////////////////////
246 // Propagate status to other linked contexts
247 // CHANGES mainContextRef!!!
248 
249             if (mainContext->hasError() && !hasExistingRequests && !requestedFrames) {
250                 PFrameContext n;
251                 do {
252                     n = mainContextRef->notificationChain;
253 
254                     if (n) {
255                         mainContextRef->notificationChain.reset();
256                         n->setError(mainContextRef->getErrorMessage());
257                     }
258 
259                     if (mainContextRef->upstreamContext) {
260                         owner->startInternal(mainContextRef);
261                     }
262 
263                     if (mainContextRef->frameDone) {
264                         owner->returnFrame(mainContextRef, mainContextRef->getErrorMessage());
265                     }
266                 } while ((mainContextRef = n));
267             } else if (f) {
268                 if (hasExistingRequests || requestedFrames)
269                     vsFatal("A frame was returned at the end of processing by %s but there are still outstanding requests", clip->name.c_str());
270                 PFrameContext n;
271 
272                 do {
273                     n = mainContextRef->notificationChain;
274 
275                     if (n)
276                         mainContextRef->notificationChain.reset();
277 
278                     if (mainContextRef->upstreamContext) {
279                         mainContextRef->returnedFrame = f;
280                         owner->startInternal(mainContextRef);
281                     }
282 
283                     if (mainContextRef->frameDone)
284                         owner->returnFrame(mainContextRef, f);
285                 } while ((mainContextRef = n));
286             } else if (hasExistingRequests || requestedFrames) {
287                 // already scheduled, do nothing
288             } else {
289                 vsFatal("No frame returned at the end of processing by %s", clip->name.c_str());
290             }
291             break;
292         }
293 
294 
295         if (!ranTask || owner->activeThreads > owner->maxThreads) {
296             --owner->activeThreads;
297             if (stop) {
298                 lock.unlock();
299                 break;
300             }
301             ++owner->idleThreads;
302             if (owner->idleThreads == owner->allThreads.size())
303                 owner->allIdle.notify_one();
304 
305             owner->newWork.wait(lock);
306             --owner->idleThreads;
307             ++owner->activeThreads;
308         }
309     }
310 }
311 
VSThreadPool(VSCore * core,int threads)312 VSThreadPool::VSThreadPool(VSCore *core, int threads) : core(core), activeThreads(0), idleThreads(0), reqCounter(0), stopThreads(false), ticks(0) {
313     setThreadCount(threads);
314 }
315 
threadCount()316 int VSThreadPool::threadCount() {
317     std::lock_guard<std::mutex> l(lock);
318     return maxThreads;
319 }
320 
spawnThread()321 void VSThreadPool::spawnThread() {
322     std::thread *thread = new std::thread(runTasks, this, std::ref(stopThreads));
323     allThreads.insert(std::make_pair(thread->get_id(), thread));
324     ++activeThreads;
325 }
326 
setThreadCount(int threads)327 int VSThreadPool::setThreadCount(int threads) {
328     std::lock_guard<std::mutex> l(lock);
329     maxThreads = threads > 0 ? threads : getNumAvailableThreads();
330     if (maxThreads == 0) {
331         maxThreads = 1;
332         vsWarning("Couldn't detect optimal number of threads. Thread count set to 1.");
333     }
334     return maxThreads;
335 }
336 
wakeThread()337 void VSThreadPool::wakeThread() {
338     if (activeThreads < maxThreads) {
339         if (idleThreads == 0) // newly spawned threads are active so no need to notify an additional thread
340             spawnThread();
341         else
342             newWork.notify_one();
343     }
344 }
345 
releaseThread()346 void VSThreadPool::releaseThread() {
347     --activeThreads;
348 }
349 
reserveThread()350 void VSThreadPool::reserveThread() {
351     ++activeThreads;
352 }
353 
notifyCaches(bool needMemory)354 void VSThreadPool::notifyCaches(bool needMemory) {
355     std::lock_guard<std::mutex> lock(core->cacheLock);
356     for (auto &cache : core->caches)
357         cache->notifyCache(needMemory);
358 }
359 
start(const PFrameContext & context)360 void VSThreadPool::start(const PFrameContext &context) {
361     assert(context);
362     std::lock_guard<std::mutex> l(lock);
363     context->reqOrder = ++reqCounter;
364     startInternal(context);
365 }
366 
returnFrame(const PFrameContext & rCtx,const PVideoFrame & f)367 void VSThreadPool::returnFrame(const PFrameContext &rCtx, const PVideoFrame &f) {
368     assert(rCtx->frameDone);
369     bool outputLock = rCtx->lockOnOutput;
370     // we need to unlock here so the callback may request more frames without causing a deadlock
371     // AND so that slow callbacks will only block operations in this thread, not all the others
372     lock.unlock();
373     VSFrameRef *ref = new VSFrameRef(f);
374     if (outputLock)
375         callbackLock.lock();
376     rCtx->frameDone(rCtx->userData, ref, rCtx->n, rCtx->node, nullptr);
377     if (outputLock)
378         callbackLock.unlock();
379     lock.lock();
380 }
381 
returnFrame(const PFrameContext & rCtx,const std::string & errMsg)382 void VSThreadPool::returnFrame(const PFrameContext &rCtx, const std::string &errMsg) {
383     assert(rCtx->frameDone);
384     bool outputLock = rCtx->lockOnOutput;
385     // we need to unlock here so the callback may request more frames without causing a deadlock
386     // AND so that slow callbacks will only block operations in this thread, not all the others
387     lock.unlock();
388     if (outputLock)
389         callbackLock.lock();
390     rCtx->frameDone(rCtx->userData, nullptr, rCtx->n, rCtx->node, errMsg.c_str());
391     if (outputLock)
392         callbackLock.unlock();
393     lock.lock();
394 }
395 
startInternal(const PFrameContext & context)396 void VSThreadPool::startInternal(const PFrameContext &context) {
397     //technically this could be done by walking up the context chain and add a new notification to the correct one
398     //unfortunately this would probably be quite slow for deep scripts so just hope the cache catches it
399 
400     if (context->n < 0)
401         vsFatal("Negative frame request by: %s", context->upstreamContext->clip->getName().c_str());
402 
403     // check to see if it's time to reevaluate cache sizes
404     if (core->memory->isOverLimit()) {
405         ticks = 0;
406         notifyCaches(true);
407     }
408 
409     // a normal tick for caches to adjust their sizes based on recent history
410     if (!context->upstreamContext && ++ticks == 500) {
411         ticks = 0;
412         notifyCaches(false);
413     }
414 
415     // add it immediately if the task is to return a completed frame or report an error since it never has an existing context
416     if (context->returnedFrame || context->hasError()) {
417         tasks.push_back(context);
418     } else {
419         if (context->upstreamContext)
420             ++context->upstreamContext->numFrameRequests;
421 
422         NodeOutputKey p(context->clip, context->n, context->index);
423 
424         if (allContexts.count(p)) {
425             PFrameContext &ctx = allContexts[p];
426             assert(context->clip == ctx->clip && context->n == ctx->n && context->index == ctx->index);
427 
428             if (ctx->returnedFrame) {
429                 // special case where the requested frame is encountered "by accident"
430                 context->returnedFrame = ctx->returnedFrame;
431                 tasks.push_back(context);
432             } else {
433                 // add it to the list of contexts to notify when it's available
434                 context->notificationChain = ctx->notificationChain;
435                 ctx->notificationChain = context;
436                 ctx->reqOrder = std::min(ctx->reqOrder, context->reqOrder);
437             }
438         } else {
439             // create a new context and append it to the tasks
440             allContexts[p] = context;
441             tasks.push_back(context);
442         }
443     }
444     wakeThread();
445 }
446 
isWorkerThread()447 bool VSThreadPool::isWorkerThread() {
448     std::lock_guard<std::mutex> m(lock);
449     return allThreads.count(std::this_thread::get_id()) > 0;
450 }
451 
waitForDone()452 void VSThreadPool::waitForDone() {
453     std::unique_lock<std::mutex> m(lock);
454     if (idleThreads < allThreads.size())
455         allIdle.wait(m);
456 }
457 
~VSThreadPool()458 VSThreadPool::~VSThreadPool() {
459     std::unique_lock<std::mutex> m(lock);
460     stopThreads = true;
461 
462     while (!allThreads.empty()) {
463         auto iter = allThreads.begin();
464         auto thread = iter->second;
465         newWork.notify_all();
466         m.unlock();
467         thread->join();
468         m.lock();
469         allThreads.erase(iter);
470         delete thread;
471         newWork.notify_all();
472     }
473 
474     assert(activeThreads == 0);
475     assert(idleThreads == 0);
476 };
477