1 // Copyright (c) 2018-2020 Intel Corporation
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 //
10 // The above copyright notice and this permission notice shall be included in all
11 // copies or substantial portions of the Software.
12 //
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 // SOFTWARE.
20
21 #include <mfx_scheduler_core.h>
22
23 #include <mfx_scheduler_core_task.h>
24 #include <mfx_scheduler_core_handle.h>
25
26 #include <vm_time.h>
27 #include <vm_sys_info.h>
28 #include <mfx_trace.h>
29
30 #include <functional>
31 #include <cassert>
32 #include <list>
33
34 enum
35 {
36 MFX_TIME_INFINITE = 0x7fffffff,
37 MFX_TIME_TO_WAIT = 5
38 };
39
Initialize(const MFX_SCHEDULER_PARAM * pParam)40 mfxStatus mfxSchedulerCore::Initialize(const MFX_SCHEDULER_PARAM *pParam)
41 {
42 MFX_SCHEDULER_PARAM2 param2;
43 memset(¶m2, 0, sizeof(param2));
44 if (pParam) {
45 MFX_SCHEDULER_PARAM* casted_param2 = ¶m2;
46 *casted_param2 = *pParam;
47 }
48 if (!param2.numberOfThreads) {
49 // that's just in case: core, which calls scheduler, is doing
50 // exactly the same what we are doing below
51 param2.numberOfThreads = vm_sys_info_get_cpu_num();
52 }
53 return Initialize2(¶m2);
54 }
55
Initialize2(const MFX_SCHEDULER_PARAM2 * pParam)56 mfxStatus mfxSchedulerCore::Initialize2(const MFX_SCHEDULER_PARAM2 *pParam)
57 {
58 mfxU32 i;
59
60 // release the object before initialization
61 Close();
62
63
64 // save the parameters
65 if (pParam)
66 {
67 m_param = *pParam;
68 }
69
70 // clean up the task look up table
71 m_ppTaskLookUpTable.resize(MFX_MAX_NUMBER_TASK, nullptr);
72
73 // allocate the dependency table
74 m_pDependencyTable.resize(MFX_MAX_NUMBER_TASK * 2, MFX_DEPENDENCY_ITEM());
75
76 // allocate the thread assignment object table.
77 // its size should be equal to the number of task,
78 // larger table is not required.
79 m_occupancyTable.resize(MFX_MAX_NUMBER_TASK, MFX_THREAD_ASSIGNMENT());
80
81 if (MFX_SINGLE_THREAD != m_param.flags)
82 {
83 if (m_param.numberOfThreads && m_param.params.NumThread) {
84 // use user-overwritten number of threads
85 m_param.numberOfThreads = m_param.params.NumThread;
86 }
87 if (!m_param.numberOfThreads) {
88 return MFX_ERR_UNSUPPORTED;
89 }
90 if (m_param.numberOfThreads == 1) {
91 // we need at least 2 threads to avoid dead locks
92 return MFX_ERR_UNSUPPORTED;
93 }
94
95
96 try
97 {
98 // allocate thread contexts
99 m_pThreadCtx = new MFX_SCHEDULER_THREAD_CONTEXT[m_param.numberOfThreads];
100
101 // start threads
102 for (i = 0; i < m_param.numberOfThreads; i += 1)
103 {
104 // prepare context
105 m_pThreadCtx[i].threadNum = i;
106 m_pThreadCtx[i].pSchedulerCore = this;
107
108 // spawn a thread
109 m_pThreadCtx[i].threadHandle = std::thread(
110 std::bind(&mfxSchedulerCore::ThreadProc, this, &m_pThreadCtx[i]));
111
112 if (!SetScheduling(m_pThreadCtx[i].threadHandle)) {
113 return MFX_ERR_UNSUPPORTED;
114 }
115 }
116 }
117 catch (...)
118 {
119 return MFX_ERR_MEMORY_ALLOC;
120 }
121
122
123 SetThreadsAffinityToSockets();
124 }
125 else
126 {
127 // to run HW listen thread. Will be enabled if tests are OK
128
129 }
130
131 return MFX_ERR_NONE;
132
133 } // mfxStatus mfxSchedulerCore::Initialize(mfxSchedulerFlags flags, mfxU32 numberOfThreads)
134
AddTask(const MFX_TASK & task,mfxSyncPoint * pSyncPoint)135 mfxStatus mfxSchedulerCore::AddTask(const MFX_TASK &task, mfxSyncPoint *pSyncPoint)
136 {
137 return AddTask(task, pSyncPoint, NULL, 0);
138
139 } // mfxStatus mfxSchedulerCore::AddTask(const MFX_TASK &task, mfxSyncPoint *pSyncPoint)
140
Synchronize(mfxSyncPoint syncPoint,mfxU32 timeToWait)141 mfxStatus mfxSchedulerCore::Synchronize(mfxSyncPoint syncPoint, mfxU32 timeToWait)
142 {
143 mfxTaskHandle handle;
144 mfxStatus mfxRes;
145
146 // check error(s)
147 if (NULL == syncPoint)
148 {
149 return MFX_ERR_NULL_PTR;
150 }
151
152 // cast the pointer to handle and make syncing
153 handle.handle = (size_t) syncPoint;
154
155
156 // waiting on task's handle
157 mfxRes = Synchronize(handle, timeToWait);
158
159
160 return mfxRes;
161
162 } // mfxStatus mfxSchedulerCore::Synchronize(mfxSyncPoint syncPoint, mfxU32 timeToWait)
163
Synchronize(mfxTaskHandle handle,mfxU32 timeToWait)164 mfxStatus mfxSchedulerCore::Synchronize(mfxTaskHandle handle, mfxU32 timeToWait)
165 {
166 // check error(s)
167 if (0 == m_param.numberOfThreads)
168 {
169 return MFX_ERR_NOT_INITIALIZED;
170 }
171
172 // look up the task
173 MFX_SCHEDULER_TASK *pTask = m_ppTaskLookUpTable.at(handle.taskID);
174
175 if (nullptr == pTask)
176 {
177 return MFX_ERR_NULL_PTR;
178 }
179
180 if (MFX_SINGLE_THREAD == m_param.flags)
181 {
182 //let really run task to
183 MFX_CALL_INFO call = {};
184 mfxTaskHandle previousTaskHandle = {};
185
186 mfxStatus task_sts = MFX_ERR_NONE;
187 mfxU64 start = GetHighPerformanceCounter();
188 mfxU64 frequency = vm_time_get_frequency();
189 while (MFX_WRN_IN_EXECUTION == pTask->opRes)
190 {
191 std::unique_lock<std::mutex> guard(m_guard);
192 task_sts = GetTask(call, previousTaskHandle, 0);
193
194 if (task_sts != MFX_ERR_NONE)
195 continue;
196
197 guard.unlock();
198
199 call.res = call.pTask->entryPoint.pRoutine(call.pTask->entryPoint.pState,
200 call.pTask->entryPoint.pParam,
201 call.threadNum,
202 call.callNum);
203
204 guard.lock();
205
206 // save the previous task's handle
207 previousTaskHandle = call.taskHandle;
208
209 MarkTaskCompleted(&call, 0);
210
211 if ((mfxU32)((GetHighPerformanceCounter() - start)/frequency) > timeToWait)
212 break;
213
214 if (MFX_TASK_DONE!= call.res)
215 {
216 IncrementHWEventCounter();
217 }
218 }
219 //
220 // inspect the task
221 //
222
223 // the handle is outdated,
224 // the previous task job is over and completed with successful status
225 // NOTE: it make sense to read task result and job ID the following order.
226 if ((MFX_ERR_NONE == pTask->opRes) ||
227 (pTask->jobID != handle.jobID))
228 {
229 return MFX_ERR_NONE;
230 }
231
232 // wait the result on the event
233 if (MFX_WRN_IN_EXECUTION == pTask->opRes)
234 {
235 return MFX_WRN_IN_EXECUTION;
236 }
237
238 // check error status
239 if ((MFX_ERR_NONE != pTask->opRes) &&
240 (pTask->jobID == handle.jobID))
241 {
242 return pTask->opRes;
243 }
244
245 // in all other cases task is complete
246 return MFX_ERR_NONE;
247 }
248 else
249 {
250 std::unique_lock<std::mutex> guard(m_guard);
251
252 MFX_AUTO_LTRACE(MFX_TRACE_LEVEL_PRIVATE, "Scheduler::Wait");
253 MFX_LTRACE_1(MFX_TRACE_LEVEL_SCHED, "^Depends^on", "%d", pTask->param.task.nParentId);
254 MFX_LTRACE_I(MFX_TRACE_LEVEL_SCHED, timeToWait);
255
256 pTask->done.wait_for(guard, std::chrono::milliseconds(timeToWait), [pTask, handle] {
257 return (pTask->jobID != handle.jobID) || (MFX_WRN_IN_EXECUTION != pTask->opRes);
258 });
259
260 if (pTask->jobID == handle.jobID) {
261 return pTask->opRes;
262 } else {
263 /* Notes:
264 * - task executes next job already, we _lost_ task status and can only assume that
265 * everything was OK or FAILED, we will assume that task succeeded
266 */
267 return MFX_ERR_NONE;
268 }
269 }
270 }
271
GetTimeout(mfxU32 & maxTimeToRun)272 mfxStatus mfxSchedulerCore::GetTimeout(mfxU32& maxTimeToRun)
273 {
274 (void)maxTimeToRun;
275
276 return MFX_ERR_UNSUPPORTED;
277 }
278
WaitForDependencyResolved(const void * pDependency)279 mfxStatus mfxSchedulerCore::WaitForDependencyResolved(const void *pDependency)
280 {
281 mfxTaskHandle waitHandle = {};
282 bool bFind = false;
283
284 // check error(s)
285 if (0 == m_param.numberOfThreads)
286 {
287 return MFX_ERR_NOT_INITIALIZED;
288 }
289 if (NULL == pDependency)
290 {
291 return MFX_ERR_NONE;
292 }
293
294 // find a handle to wait
295 {
296 std::lock_guard<std::mutex> guard(m_guard);
297 mfxU32 curIdx;
298
299 for (curIdx = 0; curIdx < m_numDependencies; curIdx += 1)
300 {
301 if (m_pDependencyTable.at(curIdx).p == pDependency)
302 {
303 // get the handle before leaving protected section
304 waitHandle.taskID = m_pDependencyTable[curIdx].pTask->taskID;
305 waitHandle.jobID = m_pDependencyTable[curIdx].pTask->jobID;
306
307 // handle is found, go to wait
308 bFind = true;
309 break;
310 }
311 }
312 // leave the protected section
313 }
314
315 // the dependency is still in the table,
316 // wait until it leaves the table.
317 if (bFind)
318 {
319 return Synchronize(waitHandle, MFX_TIME_INFINITE);
320 }
321
322 return MFX_ERR_NONE;
323
324 } // mfxStatus mfxSchedulerCore::WaitForDependencyResolved(const void *pDependency)
325
WaitForAllTasksCompletion(const void * pOwner)326 mfxStatus mfxSchedulerCore::WaitForAllTasksCompletion(const void *pOwner)
327 {
328 // check error(s)
329 if (0 == m_param.numberOfThreads)
330 {
331 return MFX_ERR_NOT_INITIALIZED;
332 }
333 if (NULL == pOwner)
334 {
335 return MFX_ERR_NULL_PTR;
336 }
337
338 // make sure that threads are running
339 {
340 std::lock_guard<std::mutex> guard(m_guard);
341
342 ResetWaitingTasks(pOwner);
343 WakeUpThreads();
344 }
345
346 std::list<mfxTaskHandle> tasks;
347
348 {
349 std::lock_guard<std::mutex> guard(m_guard);
350
351 ForEachTask(
352 [&pOwner, &tasks](MFX_SCHEDULER_TASK* task)
353 {
354 //make a list of all 'active' tasks of given owner
355 if ((task->param.task.pOwner == pOwner) && (MFX_WRN_IN_EXECUTION == task->opRes))
356 {
357 mfxTaskHandle waitHandle;
358 waitHandle.taskID = task->taskID;
359 waitHandle.jobID = task->jobID;
360
361 tasks.emplace_back(waitHandle);
362 }
363 }
364 );
365 }
366
367 auto handle = tasks.begin();
368 while (!tasks.empty())
369 {
370 if (tasks.end() == handle)
371 handle = tasks.begin();
372
373 // wait for a while
374 // for a while and infinite are different things
375 if (MFX_WRN_IN_EXECUTION == Synchronize(*handle, MFX_TIME_TO_WAIT)) // Still working
376 {
377 handle++;
378 }
379 else // Done or failed
380 {
381 handle = tasks.erase(handle);
382 }
383 }
384
385 return MFX_ERR_NONE;
386
387 } // mfxStatus mfxSchedulerCore::WaitForAllTasksCompletion(const void *pOwner)
388
ResetWaitingStatus(const void * pOwner)389 mfxStatus mfxSchedulerCore::ResetWaitingStatus(const void *pOwner)
390 {
391 // reset 'waiting' tasks belong to the given state
392 ResetWaitingTasks(pOwner);
393
394 std::lock_guard<std::mutex> guard(m_guard);
395
396 // wake up sleeping threads
397 WakeUpThreads();
398
399 return MFX_ERR_NONE;
400
401 } // mfxStatus mfxSchedulerCore::ResetWaitingStatus(const void *pOwner)
402
GetState(void)403 mfxStatus mfxSchedulerCore::GetState(void)
404 {
405 // check error(s)
406 if (0 == m_param.numberOfThreads)
407 {
408 return MFX_ERR_NOT_INITIALIZED;
409 }
410
411 return MFX_ERR_NONE;
412
413 } // mfxStatus mfxSchedulerCore::GetState(void)
414
GetParam(MFX_SCHEDULER_PARAM * pParam)415 mfxStatus mfxSchedulerCore::GetParam(MFX_SCHEDULER_PARAM *pParam)
416 {
417 // check error(s)
418 if (0 == m_param.numberOfThreads)
419 {
420 return MFX_ERR_NOT_INITIALIZED;
421 }
422 if (NULL == pParam)
423 {
424 return MFX_ERR_NULL_PTR;
425 }
426
427 // copy the parameters
428 *pParam = m_param;
429
430 return MFX_ERR_NONE;
431
432 } // mfxStatus mfxSchedulerCore::GetParam(MFX_SCHEDULER_PARAM *pParam)
433
Reset(void)434 mfxStatus mfxSchedulerCore::Reset(void)
435 {
436 // check error(s)
437 if (0 == m_param.numberOfThreads)
438 {
439 return MFX_ERR_NOT_INITIALIZED;
440 }
441
442 if (NULL == m_pFailedTasks)
443 {
444 return MFX_ERR_NONE;
445 }
446
447 // enter guarded section
448 {
449 std::lock_guard<std::mutex> guard(m_guard);
450
451 // clean up the working queue
452 ScrubCompletedTasks(true);
453 }
454
455 return MFX_ERR_NONE;
456
457 } // mfxStatus mfxSchedulerCore::Reset(void)
458
AdjustPerformance(const mfxSchedulerMessage message)459 mfxStatus mfxSchedulerCore::AdjustPerformance(const mfxSchedulerMessage message)
460 {
461 mfxStatus mfxRes = MFX_ERR_NONE;
462
463 // check error(s)
464 if (0 == m_param.numberOfThreads)
465 {
466 return MFX_ERR_NOT_INITIALIZED;
467 }
468
469 switch(message)
470 {
471 // reset the scheduler to the performance default state
472 case MFX_SCHEDULER_RESET_TO_DEFAULTS:
473 break;
474
475 case MFX_SCHEDULER_START_HW_LISTENING:
476 if (m_param.flags != MFX_SINGLE_THREAD)
477 {
478 mfxRes = StartWakeUpThread();
479 }
480 break;
481
482 case MFX_SCHEDULER_STOP_HW_LISTENING:
483 if (m_param.flags != MFX_SINGLE_THREAD)
484 {
485 mfxRes = StopWakeUpThread();
486 }
487 break;
488
489 // unknown message
490 default:
491 mfxRes = MFX_ERR_UNKNOWN;
492 break;
493 }
494
495 return mfxRes;
496
497 } // mfxStatus mfxSchedulerCore::AdjustPerformance(const mfxSchedulerMessage message)
498
499
AddTask(const MFX_TASK & task,mfxSyncPoint * pSyncPoint,const char * pFileName,int lineNumber)500 mfxStatus mfxSchedulerCore::AddTask(const MFX_TASK &task, mfxSyncPoint *pSyncPoint,
501 const char *pFileName, int lineNumber)
502 {
503 #ifdef MFX_TRACE_ENABLE
504 MFX_LTRACE_1(MFX_TRACE_LEVEL_SCHED, "^Enqueue^", "%d", task.nTaskId);
505 #endif
506
507
508 // check error(s)
509 if (0 == m_param.numberOfThreads)
510 {
511 return MFX_ERR_NOT_INITIALIZED;
512 }
513 if ((NULL == task.entryPoint.pRoutine) ||
514 (NULL == pSyncPoint))
515 {
516 return MFX_ERR_NULL_PTR;
517 }
518
519
520 // enter protected section
521 {
522 std::unique_lock<std::mutex> guard(m_guard);
523 // make sure that there is enough free task objects
524 m_freeTasks.wait(guard, [this](){return m_freeTasksCount > 0;});
525 --m_freeTasksCount;
526 mfxStatus mfxRes;
527 MFX_SCHEDULER_TASK *pTask, **ppTemp;
528 mfxTaskHandle handle;
529 MFX_THREAD_ASSIGNMENT *pAssignment = nullptr;
530 mfxU32 occupancyIdx;
531 int type;
532
533 // Make sure that there is an empty task object
534
535 mfxRes = AllocateEmptyTask();
536 if (MFX_ERR_NONE != mfxRes)
537 {
538 // better to return error instead of WRN (two-tasks per component scheme)
539 return MFX_ERR_MEMORY_ALLOC;
540 }
541
542 // initialize the task
543 m_pFreeTasks->ResetDependency();
544 mfxRes = m_pFreeTasks->Reset();
545 if (MFX_ERR_NONE != mfxRes)
546 {
547 return mfxRes;
548 }
549 m_pFreeTasks->param.task = task;
550 mfxRes = GetOccupancyTableIndex(occupancyIdx, &task);
551 if (MFX_ERR_NONE != mfxRes)
552 {
553 return mfxRes;
554 }
555 if (m_occupancyTable.size() <= occupancyIdx)
556 {
557 return MFX_ERR_UNDEFINED_BEHAVIOR;
558 }
559 pAssignment = &(m_occupancyTable[occupancyIdx]);
560
561 // update the thread assignment parameters
562 if (MFX_TASK_INTRA & task.threadingPolicy)
563 {
564 // last entries in the dependency arrays must be empty
565 if ((m_pFreeTasks->param.task.pSrc[MFX_TASK_NUM_DEPENDENCIES - 1]) ||
566 (m_pFreeTasks->param.task.pDst[MFX_TASK_NUM_DEPENDENCIES - 1]))
567 {
568 return MFX_ERR_INVALID_VIDEO_PARAM;
569 }
570
571 // fill INTRA task dependencies
572 m_pFreeTasks->param.task.pSrc[MFX_TASK_NUM_DEPENDENCIES - 1] = pAssignment->pLastTask;
573 m_pFreeTasks->param.task.pDst[MFX_TASK_NUM_DEPENDENCIES - 1] = m_pFreeTasks;
574 // update the last intra task pointer
575 pAssignment->pLastTask = m_pFreeTasks;
576 }
577 // do not save the pointer to thread assigment instance
578 // until all checking have been done
579 m_pFreeTasks->param.pThreadAssignment = pAssignment;
580 pAssignment->m_numRefs += 1;
581
582 // saturate the number of available threads
583 uint32_t numThreads = m_pFreeTasks->param.task.entryPoint.requiredNumThreads;
584 numThreads = (0 == numThreads) ? m_param.numberOfThreads : numThreads;
585
586 numThreads = std::min<uint32_t>({m_param.numberOfThreads, numThreads, sizeof(pAssignment->threadMask) * 8});
587 m_pFreeTasks->param.task.entryPoint.requiredNumThreads = numThreads;
588
589 // set the advanced task's info
590 m_pFreeTasks->param.sourceInfo.pFileName = pFileName;
591 m_pFreeTasks->param.sourceInfo.lineNumber = lineNumber;
592 // set the sync point for the task
593 handle.handle = 0;
594 handle.taskID = m_pFreeTasks->taskID;
595 handle.jobID = m_pFreeTasks->jobID;
596 *pSyncPoint = (mfxSyncPoint) handle.handle;
597
598 // Register task dependencies
599 RegisterTaskDependencies(m_pFreeTasks);
600
601
602 //
603 // move task to the corresponding task
604 //
605
606 // remove the task from the 'free' queue
607 pTask = m_pFreeTasks;
608 m_pFreeTasks = m_pFreeTasks->pNext;
609 pTask->pNext = NULL;
610
611 // find the end of the corresponding queue
612 type = (task.threadingPolicy & MFX_TASK_DEDICATED) ? (MFX_TYPE_HARDWARE) : (MFX_TYPE_SOFTWARE);
613 ppTemp = m_pTasks[task.priority] + type;
614 while (*ppTemp)
615 {
616 ppTemp = &((*ppTemp)->pNext);
617 }
618
619 // add the task to the end of the corresponding queue
620 *ppTemp = pTask;
621
622 // reset all 'waiting' tasks to prevent freezing
623 // so called 'permanent' tasks.
624 ResetWaitingTasks(pTask->param.task.pOwner);
625
626 mfxU32 num_hw_threads = 0, num_sw_threads = 0;
627
628 // increment the number of available tasks
629 if (MFX_TASK_DEDICATED & task.threadingPolicy) {
630 num_hw_threads = numThreads;
631 } else {
632 num_sw_threads = numThreads;
633 }
634
635 // wake up working threads if task has resolved dependencies
636 if (IsReadyToRun(pTask)) {
637 WakeUpThreads(num_hw_threads, num_sw_threads);
638 }
639
640 // leave the protected section
641 }
642
643 return MFX_ERR_NONE;
644
645 }
646
DoWork()647 mfxStatus mfxSchedulerCore::DoWork()
648 {
649 return MFX_ERR_UNSUPPORTED;
650 } // mfxStatus mfxSchedulerCore::DoWork()
651
652
653
654
655