1 /*
2 * Copyright (c) 2012-2015 Fredrik Mellbin
3 *
4 * This file is part of VapourSynth.
5 *
6 * VapourSynth is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * VapourSynth is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with VapourSynth; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 #include "vscore.h"
22 #include <cassert>
23 #include <bitset>
24 #ifdef VS_TARGET_CPU_X86
25 #include "x86utils.h"
26 #endif
27
28 #if defined(HAVE_SCHED_GETAFFINITY)
29 #include <sched.h>
30 #elif defined(HAVE_CPUSET_GETAFFINITY)
31 #include <sys/param.h>
32 #include <sys/_cpuset.h>
33 #include <sys/cpuset.h>
34 #endif
35
getNumAvailableThreads()36 int VSThreadPool::getNumAvailableThreads() {
37 int nthreads = std::thread::hardware_concurrency();
38 #ifdef _WIN32
39 DWORD_PTR pAff = 0;
40 DWORD_PTR sAff = 0;
41 BOOL res = GetProcessAffinityMask(GetCurrentProcess(), &pAff, &sAff);
42 if (res && pAff != 0) {
43 std::bitset<sizeof(sAff) * 8> b(pAff);
44 nthreads = b.count();
45 }
46 #elif defined(HAVE_SCHED_GETAFFINITY)
47 // Linux only.
48 cpu_set_t affinity;
49 if (sched_getaffinity(0, sizeof(cpu_set_t), &affinity) == 0)
50 nthreads = CPU_COUNT(&affinity);
51 #elif defined(HAVE_CPUSET_GETAFFINITY)
52 // BSD only (FreeBSD only?)
53 cpuset_t affinity;
54 if (cpuset_getaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(cpuset_t), &affinity) == 0)
55 nthreads = CPU_COUNT(&affinity);
56 #endif
57
58 return nthreads;
59 }
60
taskCmp(const PFrameContext & a,const PFrameContext & b)61 bool VSThreadPool::taskCmp(const PFrameContext &a, const PFrameContext &b) {
62 return (a->reqOrder < b->reqOrder) || (a->reqOrder == b->reqOrder && a->n < b->n);
63 }
64
runTasks(VSThreadPool * owner,std::atomic<bool> & stop)65 void VSThreadPool::runTasks(VSThreadPool *owner, std::atomic<bool> &stop) {
66 #ifdef VS_TARGET_OS_WINDOWS
67 if (!vs_isSSEStateOk())
68 vsFatal("Bad SSE state detected after creating new thread");
69 #endif
70
71 std::unique_lock<std::mutex> lock(owner->lock);
72
73 while (true) {
74 bool ranTask = false;
75
76 /////////////////////////////////////////////////////////////////////////////////////////////
77 // Go through all tasks from the top (oldest) and process the first one possible
78 owner->tasks.sort(taskCmp);
79
80 for (auto iter = owner->tasks.begin(); iter != owner->tasks.end(); ++iter) {
81 FrameContext *mainContext = iter->get();
82 FrameContext *leafContext = nullptr;
83
84 /////////////////////////////////////////////////////////////////////////////////////////////
85 // Handle the output tasks
86 if (mainContext->frameDone && mainContext->returnedFrame) {
87 PFrameContext mainContextRef(*iter);
88 owner->tasks.erase(iter);
89 owner->returnFrame(mainContextRef, mainContext->returnedFrame);
90 ranTask = true;
91 break;
92 }
93
94 if (mainContext->frameDone && mainContext->hasError()) {
95 PFrameContext mainContextRef(*iter);
96 owner->tasks.erase(iter);
97 owner->returnFrame(mainContextRef, mainContext->getErrorMessage());
98 ranTask = true;
99 break;
100 }
101
102 bool hasLeafContext = mainContext->returnedFrame || mainContext->hasError();
103 if (hasLeafContext) {
104 leafContext = mainContext;
105 mainContext = mainContext->upstreamContext.get();
106 }
107
108 VSNode *clip = mainContext->clip;
109 int filterMode = clip->filterMode;
110
111 /////////////////////////////////////////////////////////////////////////////////////////////
112 // This part handles the locking for the different filter modes
113
114 bool parallelRequestsNeedsUnlock = false;
115 if (filterMode == fmUnordered || filterMode == fmUnorderedLinear) {
116 // already busy?
117 if (!clip->serialMutex.try_lock())
118 continue;
119 } else if (filterMode == fmSerial) {
120 // already busy?
121 if (!clip->serialMutex.try_lock())
122 continue;
123 // no frame in progress?
124 if (clip->serialFrame == -1) {
125 clip->serialFrame = mainContext->n;
126 // another frame already in progress?
127 } else if (clip->serialFrame != mainContext->n) {
128 clip->serialMutex.unlock();
129 continue;
130 }
131 // continue processing the already started frame
132 } else if (filterMode == fmParallel) {
133 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
134 // is the filter already processing another call for this frame? if so move along
135 if (!clip->concurrentFrames.insert(mainContext->n).second)
136 continue;
137 } else if (filterMode == fmParallelRequests) {
138 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
139 // do we need the serial lock since all frames will be ready this time?
140 // check if we're in the arAllFramesReady state so we need additional locking
141 if (mainContext->numFrameRequests == 1) {
142 if (!clip->serialMutex.try_lock())
143 continue;
144 parallelRequestsNeedsUnlock = true;
145 } else {
146 // is the filter already processing another call for this frame? if so move along
147 if (!clip->concurrentFrames.insert(mainContext->n).second)
148 continue;
149 }
150 }
151
152 /////////////////////////////////////////////////////////////////////////////////////////////
153 // Remove the context from the task list
154
155 PFrameContext mainContextRef;
156 PFrameContext leafContextRef;
157 if (hasLeafContext) {
158 leafContextRef = *iter;
159 mainContextRef = leafContextRef->upstreamContext;
160 } else {
161 mainContextRef = *iter;
162 }
163
164 owner->tasks.erase(iter);
165
166 /////////////////////////////////////////////////////////////////////////////////////////////
167 // Figure out the activation reason
168
169 VSActivationReason ar = arInitial;
170 bool skipCall = false; // Used to avoid multiple error calls for the same frame request going into a filter
171 if ((hasLeafContext && leafContext->hasError()) || mainContext->hasError()) {
172 ar = arError;
173 skipCall = mainContext->setError(leafContext->getErrorMessage());
174 --mainContext->numFrameRequests;
175 } else if (hasLeafContext && leafContext->returnedFrame) {
176 if (--mainContext->numFrameRequests > 0)
177 ar = arFrameReady;
178 else
179 ar = arAllFramesReady;
180
181 mainContext->availableFrames.insert(std::make_pair(NodeOutputKey(leafContext->clip, leafContext->n, leafContext->index), leafContext->returnedFrame));
182 mainContext->lastCompletedN = leafContext->n;
183 mainContext->lastCompletedNode = leafContext->node;
184 }
185
186 bool hasExistingRequests = !!mainContext->numFrameRequests;
187 bool isLinear = (filterMode == fmUnorderedLinear);
188
189 /////////////////////////////////////////////////////////////////////////////////////////////
190 // Do the actual processing
191
192 if (!isLinear)
193 lock.unlock();
194
195 VSFrameContext externalFrameCtx(mainContextRef);
196 assert(ar == arError || !mainContext->hasError());
197 #ifdef VS_FRAME_REQ_DEBUG
198 vsWarning("Entering: %s Frame: %d Index: %d AR: %d Req: %d", mainContext->clip->name.c_str(), mainContext->n, mainContext->index, (int)ar, (int)mainContext->reqOrder);
199 #endif
200 PVideoFrame f;
201 if (!skipCall)
202 f = clip->getFrameInternal(mainContext->n, ar, externalFrameCtx);
203 ranTask = true;
204 #ifdef VS_FRAME_REQ_DEBUG
205 vsWarning("Exiting: %s Frame: %d Index: %d AR: %d Req: %d", mainContext->clip->name.c_str(), mainContext->n, mainContext->index, (int)ar, (int)mainContext->reqOrder);
206 #endif
207 bool frameProcessingDone = f || mainContext->hasError();
208 if (mainContext->hasError() && f)
209 vsFatal("A frame was returned by %s but an error was also set, this is not allowed", clip->name.c_str());
210
211 /////////////////////////////////////////////////////////////////////////////////////////////
212 // Unlock so the next job can run on the context
213 if (filterMode == fmUnordered || filterMode == fmUnorderedLinear) {
214 clip->serialMutex.unlock();
215 } else if (filterMode == fmSerial) {
216 if (frameProcessingDone)
217 clip->serialFrame = -1;
218 clip->serialMutex.unlock();
219 } else if (filterMode == fmParallel) {
220 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
221 clip->concurrentFrames.erase(mainContext->n);
222 } else if (filterMode == fmParallelRequests) {
223 std::lock_guard<std::mutex> lock(clip->concurrentFramesMutex);
224 clip->concurrentFrames.erase(mainContext->n);
225 if (parallelRequestsNeedsUnlock)
226 clip->serialMutex.unlock();
227 }
228
229 /////////////////////////////////////////////////////////////////////////////////////////////
230 // Handle frames that were requested
231 bool requestedFrames = !externalFrameCtx.reqList.empty() && !frameProcessingDone;
232
233 if (!isLinear)
234 lock.lock();
235
236 if (requestedFrames) {
237 for (auto &reqIter : externalFrameCtx.reqList)
238 owner->startInternal(reqIter);
239 externalFrameCtx.reqList.clear();
240 }
241
242 if (frameProcessingDone)
243 owner->allContexts.erase(NodeOutputKey(mainContext->clip, mainContext->n, mainContext->index));
244
245 /////////////////////////////////////////////////////////////////////////////////////////////
246 // Propagate status to other linked contexts
247 // CHANGES mainContextRef!!!
248
249 if (mainContext->hasError() && !hasExistingRequests && !requestedFrames) {
250 PFrameContext n;
251 do {
252 n = mainContextRef->notificationChain;
253
254 if (n) {
255 mainContextRef->notificationChain.reset();
256 n->setError(mainContextRef->getErrorMessage());
257 }
258
259 if (mainContextRef->upstreamContext) {
260 owner->startInternal(mainContextRef);
261 }
262
263 if (mainContextRef->frameDone) {
264 owner->returnFrame(mainContextRef, mainContextRef->getErrorMessage());
265 }
266 } while ((mainContextRef = n));
267 } else if (f) {
268 if (hasExistingRequests || requestedFrames)
269 vsFatal("A frame was returned at the end of processing by %s but there are still outstanding requests", clip->name.c_str());
270 PFrameContext n;
271
272 do {
273 n = mainContextRef->notificationChain;
274
275 if (n)
276 mainContextRef->notificationChain.reset();
277
278 if (mainContextRef->upstreamContext) {
279 mainContextRef->returnedFrame = f;
280 owner->startInternal(mainContextRef);
281 }
282
283 if (mainContextRef->frameDone)
284 owner->returnFrame(mainContextRef, f);
285 } while ((mainContextRef = n));
286 } else if (hasExistingRequests || requestedFrames) {
287 // already scheduled, do nothing
288 } else {
289 vsFatal("No frame returned at the end of processing by %s", clip->name.c_str());
290 }
291 break;
292 }
293
294
295 if (!ranTask || owner->activeThreads > owner->maxThreads) {
296 --owner->activeThreads;
297 if (stop) {
298 lock.unlock();
299 break;
300 }
301 ++owner->idleThreads;
302 if (owner->idleThreads == owner->allThreads.size())
303 owner->allIdle.notify_one();
304
305 owner->newWork.wait(lock);
306 --owner->idleThreads;
307 ++owner->activeThreads;
308 }
309 }
310 }
311
VSThreadPool(VSCore * core,int threads)312 VSThreadPool::VSThreadPool(VSCore *core, int threads) : core(core), activeThreads(0), idleThreads(0), reqCounter(0), stopThreads(false), ticks(0) {
313 setThreadCount(threads);
314 }
315
threadCount()316 int VSThreadPool::threadCount() {
317 std::lock_guard<std::mutex> l(lock);
318 return maxThreads;
319 }
320
spawnThread()321 void VSThreadPool::spawnThread() {
322 std::thread *thread = new std::thread(runTasks, this, std::ref(stopThreads));
323 allThreads.insert(std::make_pair(thread->get_id(), thread));
324 ++activeThreads;
325 }
326
setThreadCount(int threads)327 int VSThreadPool::setThreadCount(int threads) {
328 std::lock_guard<std::mutex> l(lock);
329 maxThreads = threads > 0 ? threads : getNumAvailableThreads();
330 if (maxThreads == 0) {
331 maxThreads = 1;
332 vsWarning("Couldn't detect optimal number of threads. Thread count set to 1.");
333 }
334 return maxThreads;
335 }
336
wakeThread()337 void VSThreadPool::wakeThread() {
338 if (activeThreads < maxThreads) {
339 if (idleThreads == 0) // newly spawned threads are active so no need to notify an additional thread
340 spawnThread();
341 else
342 newWork.notify_one();
343 }
344 }
345
releaseThread()346 void VSThreadPool::releaseThread() {
347 --activeThreads;
348 }
349
reserveThread()350 void VSThreadPool::reserveThread() {
351 ++activeThreads;
352 }
353
notifyCaches(bool needMemory)354 void VSThreadPool::notifyCaches(bool needMemory) {
355 std::lock_guard<std::mutex> lock(core->cacheLock);
356 for (auto &cache : core->caches)
357 cache->notifyCache(needMemory);
358 }
359
start(const PFrameContext & context)360 void VSThreadPool::start(const PFrameContext &context) {
361 assert(context);
362 std::lock_guard<std::mutex> l(lock);
363 context->reqOrder = ++reqCounter;
364 startInternal(context);
365 }
366
returnFrame(const PFrameContext & rCtx,const PVideoFrame & f)367 void VSThreadPool::returnFrame(const PFrameContext &rCtx, const PVideoFrame &f) {
368 assert(rCtx->frameDone);
369 bool outputLock = rCtx->lockOnOutput;
370 // we need to unlock here so the callback may request more frames without causing a deadlock
371 // AND so that slow callbacks will only block operations in this thread, not all the others
372 lock.unlock();
373 VSFrameRef *ref = new VSFrameRef(f);
374 if (outputLock)
375 callbackLock.lock();
376 rCtx->frameDone(rCtx->userData, ref, rCtx->n, rCtx->node, nullptr);
377 if (outputLock)
378 callbackLock.unlock();
379 lock.lock();
380 }
381
returnFrame(const PFrameContext & rCtx,const std::string & errMsg)382 void VSThreadPool::returnFrame(const PFrameContext &rCtx, const std::string &errMsg) {
383 assert(rCtx->frameDone);
384 bool outputLock = rCtx->lockOnOutput;
385 // we need to unlock here so the callback may request more frames without causing a deadlock
386 // AND so that slow callbacks will only block operations in this thread, not all the others
387 lock.unlock();
388 if (outputLock)
389 callbackLock.lock();
390 rCtx->frameDone(rCtx->userData, nullptr, rCtx->n, rCtx->node, errMsg.c_str());
391 if (outputLock)
392 callbackLock.unlock();
393 lock.lock();
394 }
395
startInternal(const PFrameContext & context)396 void VSThreadPool::startInternal(const PFrameContext &context) {
397 //technically this could be done by walking up the context chain and add a new notification to the correct one
398 //unfortunately this would probably be quite slow for deep scripts so just hope the cache catches it
399
400 if (context->n < 0)
401 vsFatal("Negative frame request by: %s", context->upstreamContext->clip->getName().c_str());
402
403 // check to see if it's time to reevaluate cache sizes
404 if (core->memory->isOverLimit()) {
405 ticks = 0;
406 notifyCaches(true);
407 }
408
409 // a normal tick for caches to adjust their sizes based on recent history
410 if (!context->upstreamContext && ++ticks == 500) {
411 ticks = 0;
412 notifyCaches(false);
413 }
414
415 // add it immediately if the task is to return a completed frame or report an error since it never has an existing context
416 if (context->returnedFrame || context->hasError()) {
417 tasks.push_back(context);
418 } else {
419 if (context->upstreamContext)
420 ++context->upstreamContext->numFrameRequests;
421
422 NodeOutputKey p(context->clip, context->n, context->index);
423
424 if (allContexts.count(p)) {
425 PFrameContext &ctx = allContexts[p];
426 assert(context->clip == ctx->clip && context->n == ctx->n && context->index == ctx->index);
427
428 if (ctx->returnedFrame) {
429 // special case where the requested frame is encountered "by accident"
430 context->returnedFrame = ctx->returnedFrame;
431 tasks.push_back(context);
432 } else {
433 // add it to the list of contexts to notify when it's available
434 context->notificationChain = ctx->notificationChain;
435 ctx->notificationChain = context;
436 ctx->reqOrder = std::min(ctx->reqOrder, context->reqOrder);
437 }
438 } else {
439 // create a new context and append it to the tasks
440 allContexts[p] = context;
441 tasks.push_back(context);
442 }
443 }
444 wakeThread();
445 }
446
isWorkerThread()447 bool VSThreadPool::isWorkerThread() {
448 std::lock_guard<std::mutex> m(lock);
449 return allThreads.count(std::this_thread::get_id()) > 0;
450 }
451
waitForDone()452 void VSThreadPool::waitForDone() {
453 std::unique_lock<std::mutex> m(lock);
454 if (idleThreads < allThreads.size())
455 allIdle.wait(m);
456 }
457
~VSThreadPool()458 VSThreadPool::~VSThreadPool() {
459 std::unique_lock<std::mutex> m(lock);
460 stopThreads = true;
461
462 while (!allThreads.empty()) {
463 auto iter = allThreads.begin();
464 auto thread = iter->second;
465 newWork.notify_all();
466 m.unlock();
467 thread->join();
468 m.lock();
469 allThreads.erase(iter);
470 delete thread;
471 newWork.notify_all();
472 }
473
474 assert(activeThreads == 0);
475 assert(idleThreads == 0);
476 };
477