1 // Copyright (c) 2018-2020 Intel Corporation
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 //
10 // The above copyright notice and this permission notice shall be included in all
11 // copies or substantial portions of the Software.
12 //
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 // SOFTWARE.
20 
21 #include <mfx_scheduler_core.h>
22 
23 #include <mfx_scheduler_core_task.h>
24 #include <mfx_scheduler_core_handle.h>
25 
26 #include <vm_time.h>
27 #include <vm_sys_info.h>
28 #include <mfx_trace.h>
29 
30 #include <functional>
31 #include <cassert>
32 #include <list>
33 
34 enum
35 {
36     MFX_TIME_INFINITE           = 0x7fffffff,
37     MFX_TIME_TO_WAIT            = 5
38 };
39 
Initialize(const MFX_SCHEDULER_PARAM * pParam)40 mfxStatus mfxSchedulerCore::Initialize(const MFX_SCHEDULER_PARAM *pParam)
41 {
42     MFX_SCHEDULER_PARAM2 param2;
43     memset(&param2, 0, sizeof(param2));
44     if (pParam) {
45         MFX_SCHEDULER_PARAM* casted_param2 = &param2;
46         *casted_param2 = *pParam;
47     }
48     if (!param2.numberOfThreads) {
49         // that's just in case: core, which calls scheduler, is doing
50         // exactly the same what we are doing below
51         param2.numberOfThreads = vm_sys_info_get_cpu_num();
52     }
53     return Initialize2(&param2);
54 }
55 
Initialize2(const MFX_SCHEDULER_PARAM2 * pParam)56 mfxStatus mfxSchedulerCore::Initialize2(const MFX_SCHEDULER_PARAM2 *pParam)
57 {
58     mfxU32 i;
59 
60     // release the object before initialization
61     Close();
62 
63 
64     // save the parameters
65     if (pParam)
66     {
67         m_param = *pParam;
68     }
69 
70     // clean up the task look up table
71     m_ppTaskLookUpTable.resize(MFX_MAX_NUMBER_TASK, nullptr);
72 
73     // allocate the dependency table
74     m_pDependencyTable.resize(MFX_MAX_NUMBER_TASK * 2, MFX_DEPENDENCY_ITEM());
75 
76     // allocate the thread assignment object table.
77     // its size should be equal to the number of task,
78     // larger table is not required.
79     m_occupancyTable.resize(MFX_MAX_NUMBER_TASK, MFX_THREAD_ASSIGNMENT());
80 
81     if (MFX_SINGLE_THREAD != m_param.flags)
82     {
83         if (m_param.numberOfThreads && m_param.params.NumThread) {
84             // use user-overwritten number of threads
85             m_param.numberOfThreads = m_param.params.NumThread;
86         }
87         if (!m_param.numberOfThreads) {
88             return MFX_ERR_UNSUPPORTED;
89         }
90         if (m_param.numberOfThreads == 1) {
91             // we need at least 2 threads to avoid dead locks
92             return MFX_ERR_UNSUPPORTED;
93         }
94 
95 
96         try
97         {
98             // allocate thread contexts
99             m_pThreadCtx = new MFX_SCHEDULER_THREAD_CONTEXT[m_param.numberOfThreads];
100 
101             // start threads
102             for (i = 0; i < m_param.numberOfThreads; i += 1)
103             {
104                 // prepare context
105                 m_pThreadCtx[i].threadNum = i;
106                 m_pThreadCtx[i].pSchedulerCore = this;
107 
108                 // spawn a thread
109                 m_pThreadCtx[i].threadHandle = std::thread(
110                     std::bind(&mfxSchedulerCore::ThreadProc, this, &m_pThreadCtx[i]));
111 
112                 if (!SetScheduling(m_pThreadCtx[i].threadHandle)) {
113                     return MFX_ERR_UNSUPPORTED;
114                 }
115             }
116         }
117         catch (...)
118         {
119             return MFX_ERR_MEMORY_ALLOC;
120         }
121 
122 
123         SetThreadsAffinityToSockets();
124     }
125     else
126     {
127         // to run HW listen thread. Will be enabled if tests are OK
128 
129     }
130 
131     return MFX_ERR_NONE;
132 
133 } // mfxStatus mfxSchedulerCore::Initialize(mfxSchedulerFlags flags, mfxU32 numberOfThreads)
134 
AddTask(const MFX_TASK & task,mfxSyncPoint * pSyncPoint)135 mfxStatus mfxSchedulerCore::AddTask(const MFX_TASK &task, mfxSyncPoint *pSyncPoint)
136 {
137     return AddTask(task, pSyncPoint, NULL, 0);
138 
139 } // mfxStatus mfxSchedulerCore::AddTask(const MFX_TASK &task, mfxSyncPoint *pSyncPoint)
140 
Synchronize(mfxSyncPoint syncPoint,mfxU32 timeToWait)141 mfxStatus mfxSchedulerCore::Synchronize(mfxSyncPoint syncPoint, mfxU32 timeToWait)
142 {
143     mfxTaskHandle handle;
144     mfxStatus mfxRes;
145 
146     // check error(s)
147     if (NULL == syncPoint)
148     {
149         return MFX_ERR_NULL_PTR;
150     }
151 
152     // cast the pointer to handle and make syncing
153     handle.handle = (size_t) syncPoint;
154 
155 
156     // waiting on task's handle
157     mfxRes = Synchronize(handle, timeToWait);
158 
159 
160     return mfxRes;
161 
162 } // mfxStatus mfxSchedulerCore::Synchronize(mfxSyncPoint syncPoint, mfxU32 timeToWait)
163 
Synchronize(mfxTaskHandle handle,mfxU32 timeToWait)164 mfxStatus mfxSchedulerCore::Synchronize(mfxTaskHandle handle, mfxU32 timeToWait)
165 {
166     // check error(s)
167     if (0 == m_param.numberOfThreads)
168     {
169         return MFX_ERR_NOT_INITIALIZED;
170     }
171 
172     // look up the task
173     MFX_SCHEDULER_TASK *pTask = m_ppTaskLookUpTable.at(handle.taskID);
174 
175     if (nullptr == pTask)
176     {
177         return MFX_ERR_NULL_PTR;
178     }
179 
180     if (MFX_SINGLE_THREAD == m_param.flags)
181     {
182         //let really run task to
183         MFX_CALL_INFO call = {};
184         mfxTaskHandle previousTaskHandle = {};
185 
186         mfxStatus task_sts = MFX_ERR_NONE;
187         mfxU64 start = GetHighPerformanceCounter();
188         mfxU64 frequency = vm_time_get_frequency();
189         while (MFX_WRN_IN_EXECUTION == pTask->opRes)
190         {
191             std::unique_lock<std::mutex> guard(m_guard);
192             task_sts = GetTask(call, previousTaskHandle, 0);
193 
194             if (task_sts != MFX_ERR_NONE)
195                 continue;
196 
197             guard.unlock();
198 
199             call.res = call.pTask->entryPoint.pRoutine(call.pTask->entryPoint.pState,
200                                                        call.pTask->entryPoint.pParam,
201                                                        call.threadNum,
202                                                        call.callNum);
203 
204             guard.lock();
205 
206             // save the previous task's handle
207             previousTaskHandle = call.taskHandle;
208 
209             MarkTaskCompleted(&call, 0);
210 
211             if ((mfxU32)((GetHighPerformanceCounter() - start)/frequency) > timeToWait)
212                 break;
213 
214             if (MFX_TASK_DONE!= call.res)
215             {
216                 IncrementHWEventCounter();
217             }
218         }
219         //
220         // inspect the task
221         //
222 
223         // the handle is outdated,
224         // the previous task job is over and completed with successful status
225         // NOTE: it make sense to read task result and job ID the following order.
226         if ((MFX_ERR_NONE == pTask->opRes) ||
227             (pTask->jobID != handle.jobID))
228         {
229             return MFX_ERR_NONE;
230         }
231 
232         // wait the result on the event
233         if (MFX_WRN_IN_EXECUTION == pTask->opRes)
234         {
235             return MFX_WRN_IN_EXECUTION;
236         }
237 
238         // check error status
239         if ((MFX_ERR_NONE != pTask->opRes) &&
240             (pTask->jobID == handle.jobID))
241         {
242             return pTask->opRes;
243         }
244 
245         // in all other cases task is complete
246         return MFX_ERR_NONE;
247     }
248     else
249     {
250         std::unique_lock<std::mutex> guard(m_guard);
251 
252         MFX_AUTO_LTRACE(MFX_TRACE_LEVEL_PRIVATE, "Scheduler::Wait");
253         MFX_LTRACE_1(MFX_TRACE_LEVEL_SCHED, "^Depends^on", "%d", pTask->param.task.nParentId);
254         MFX_LTRACE_I(MFX_TRACE_LEVEL_SCHED, timeToWait);
255 
256         pTask->done.wait_for(guard, std::chrono::milliseconds(timeToWait), [pTask, handle] {
257            return (pTask->jobID != handle.jobID) || (MFX_WRN_IN_EXECUTION != pTask->opRes);
258         });
259 
260         if (pTask->jobID == handle.jobID) {
261             return pTask->opRes;
262         } else {
263             /* Notes:
264              *  - task executes next job already, we _lost_ task status and can only assume that
265              *  everything was OK or FAILED, we will assume that task succeeded
266              */
267             return MFX_ERR_NONE;
268         }
269     }
270 }
271 
GetTimeout(mfxU32 & maxTimeToRun)272 mfxStatus mfxSchedulerCore::GetTimeout(mfxU32& maxTimeToRun)
273 {
274     (void)maxTimeToRun;
275 
276     return MFX_ERR_UNSUPPORTED;
277 }
278 
WaitForDependencyResolved(const void * pDependency)279 mfxStatus mfxSchedulerCore::WaitForDependencyResolved(const void *pDependency)
280 {
281     mfxTaskHandle waitHandle = {};
282     bool bFind = false;
283 
284     // check error(s)
285     if (0 == m_param.numberOfThreads)
286     {
287         return MFX_ERR_NOT_INITIALIZED;
288     }
289     if (NULL == pDependency)
290     {
291         return MFX_ERR_NONE;
292     }
293 
294     // find a handle to wait
295     {
296         std::lock_guard<std::mutex> guard(m_guard);
297         mfxU32 curIdx;
298 
299         for (curIdx = 0; curIdx < m_numDependencies; curIdx += 1)
300         {
301             if (m_pDependencyTable.at(curIdx).p == pDependency)
302             {
303                 // get the handle before leaving protected section
304                 waitHandle.taskID = m_pDependencyTable[curIdx].pTask->taskID;
305                 waitHandle.jobID = m_pDependencyTable[curIdx].pTask->jobID;
306 
307                 // handle is found, go to wait
308                 bFind = true;
309                 break;
310             }
311         }
312         // leave the protected section
313     }
314 
315     // the dependency is still in the table,
316     // wait until it leaves the table.
317     if (bFind)
318     {
319         return Synchronize(waitHandle, MFX_TIME_INFINITE);
320     }
321 
322     return MFX_ERR_NONE;
323 
324 } // mfxStatus mfxSchedulerCore::WaitForDependencyResolved(const void *pDependency)
325 
WaitForAllTasksCompletion(const void * pOwner)326 mfxStatus mfxSchedulerCore::WaitForAllTasksCompletion(const void *pOwner)
327 {
328     // check error(s)
329     if (0 == m_param.numberOfThreads)
330     {
331         return MFX_ERR_NOT_INITIALIZED;
332     }
333     if (NULL == pOwner)
334     {
335         return MFX_ERR_NULL_PTR;
336     }
337 
338     // make sure that threads are running
339     {
340         std::lock_guard<std::mutex> guard(m_guard);
341 
342         ResetWaitingTasks(pOwner);
343         WakeUpThreads();
344     }
345 
346     std::list<mfxTaskHandle> tasks;
347 
348     {
349         std::lock_guard<std::mutex> guard(m_guard);
350 
351         ForEachTask(
352             [&pOwner, &tasks](MFX_SCHEDULER_TASK* task)
353             {
354                 //make a list of all 'active' tasks of given owner
355                 if ((task->param.task.pOwner == pOwner) && (MFX_WRN_IN_EXECUTION == task->opRes))
356                 {
357                     mfxTaskHandle waitHandle;
358                     waitHandle.taskID = task->taskID;
359                     waitHandle.jobID = task->jobID;
360 
361                     tasks.emplace_back(waitHandle);
362                 }
363             }
364         );
365     }
366 
367     auto handle = tasks.begin();
368     while (!tasks.empty())
369     {
370         if (tasks.end() == handle)
371             handle = tasks.begin();
372 
373         // wait for a while
374         // for a while and infinite are different things
375         if (MFX_WRN_IN_EXECUTION == Synchronize(*handle, MFX_TIME_TO_WAIT)) // Still working
376         {
377             handle++;
378         }
379         else // Done or failed
380         {
381             handle = tasks.erase(handle);
382         }
383     }
384 
385     return MFX_ERR_NONE;
386 
387 } // mfxStatus mfxSchedulerCore::WaitForAllTasksCompletion(const void *pOwner)
388 
ResetWaitingStatus(const void * pOwner)389 mfxStatus mfxSchedulerCore::ResetWaitingStatus(const void *pOwner)
390 {
391     // reset 'waiting' tasks belong to the given state
392     ResetWaitingTasks(pOwner);
393 
394     std::lock_guard<std::mutex> guard(m_guard);
395 
396     // wake up sleeping threads
397     WakeUpThreads();
398 
399     return MFX_ERR_NONE;
400 
401 } // mfxStatus mfxSchedulerCore::ResetWaitingStatus(const void *pOwner)
402 
GetState(void)403 mfxStatus mfxSchedulerCore::GetState(void)
404 {
405     // check error(s)
406     if (0 == m_param.numberOfThreads)
407     {
408         return MFX_ERR_NOT_INITIALIZED;
409     }
410 
411     return MFX_ERR_NONE;
412 
413 } // mfxStatus mfxSchedulerCore::GetState(void)
414 
GetParam(MFX_SCHEDULER_PARAM * pParam)415 mfxStatus mfxSchedulerCore::GetParam(MFX_SCHEDULER_PARAM *pParam)
416 {
417     // check error(s)
418     if (0 == m_param.numberOfThreads)
419     {
420         return MFX_ERR_NOT_INITIALIZED;
421     }
422     if (NULL == pParam)
423     {
424         return MFX_ERR_NULL_PTR;
425     }
426 
427     // copy the parameters
428     *pParam = m_param;
429 
430     return MFX_ERR_NONE;
431 
432 } // mfxStatus mfxSchedulerCore::GetParam(MFX_SCHEDULER_PARAM *pParam)
433 
Reset(void)434 mfxStatus mfxSchedulerCore::Reset(void)
435 {
436     // check error(s)
437     if (0 == m_param.numberOfThreads)
438     {
439         return MFX_ERR_NOT_INITIALIZED;
440     }
441 
442     if (NULL == m_pFailedTasks)
443     {
444         return MFX_ERR_NONE;
445     }
446 
447     // enter guarded section
448     {
449         std::lock_guard<std::mutex> guard(m_guard);
450 
451         // clean up the working queue
452         ScrubCompletedTasks(true);
453     }
454 
455     return MFX_ERR_NONE;
456 
457 } // mfxStatus mfxSchedulerCore::Reset(void)
458 
AdjustPerformance(const mfxSchedulerMessage message)459 mfxStatus mfxSchedulerCore::AdjustPerformance(const mfxSchedulerMessage message)
460 {
461     mfxStatus mfxRes = MFX_ERR_NONE;
462 
463     // check error(s)
464     if (0 == m_param.numberOfThreads)
465     {
466         return MFX_ERR_NOT_INITIALIZED;
467     }
468 
469     switch(message)
470     {
471         // reset the scheduler to the performance default state
472     case MFX_SCHEDULER_RESET_TO_DEFAULTS:
473         break;
474 
475     case MFX_SCHEDULER_START_HW_LISTENING:
476         if (m_param.flags != MFX_SINGLE_THREAD)
477         {
478             mfxRes = StartWakeUpThread();
479         }
480         break;
481 
482     case MFX_SCHEDULER_STOP_HW_LISTENING:
483         if (m_param.flags != MFX_SINGLE_THREAD)
484         {
485             mfxRes = StopWakeUpThread();
486         }
487         break;
488 
489         // unknown message
490     default:
491         mfxRes = MFX_ERR_UNKNOWN;
492         break;
493     }
494 
495     return mfxRes;
496 
497 } // mfxStatus mfxSchedulerCore::AdjustPerformance(const mfxSchedulerMessage message)
498 
499 
AddTask(const MFX_TASK & task,mfxSyncPoint * pSyncPoint,const char * pFileName,int lineNumber)500 mfxStatus mfxSchedulerCore::AddTask(const MFX_TASK &task, mfxSyncPoint *pSyncPoint,
501                                     const char *pFileName, int lineNumber)
502 {
503 #ifdef MFX_TRACE_ENABLE
504     MFX_LTRACE_1(MFX_TRACE_LEVEL_SCHED, "^Enqueue^", "%d", task.nTaskId);
505 #endif
506 
507 
508     // check error(s)
509     if (0 == m_param.numberOfThreads)
510     {
511         return MFX_ERR_NOT_INITIALIZED;
512     }
513     if ((NULL == task.entryPoint.pRoutine) ||
514         (NULL == pSyncPoint))
515     {
516         return MFX_ERR_NULL_PTR;
517     }
518 
519 
520     // enter protected section
521     {
522         std::unique_lock<std::mutex> guard(m_guard);
523         // make sure that there is enough free task objects
524         m_freeTasks.wait(guard, [this](){return m_freeTasksCount > 0;});
525         --m_freeTasksCount;
526         mfxStatus mfxRes;
527         MFX_SCHEDULER_TASK *pTask, **ppTemp;
528         mfxTaskHandle handle;
529         MFX_THREAD_ASSIGNMENT *pAssignment = nullptr;
530         mfxU32 occupancyIdx;
531         int type;
532 
533         // Make sure that there is an empty task object
534 
535         mfxRes = AllocateEmptyTask();
536         if (MFX_ERR_NONE != mfxRes)
537         {
538             // better to return error instead of WRN  (two-tasks per component scheme)
539             return MFX_ERR_MEMORY_ALLOC;
540         }
541 
542         // initialize the task
543         m_pFreeTasks->ResetDependency();
544         mfxRes = m_pFreeTasks->Reset();
545         if (MFX_ERR_NONE != mfxRes)
546         {
547             return mfxRes;
548         }
549         m_pFreeTasks->param.task = task;
550         mfxRes = GetOccupancyTableIndex(occupancyIdx, &task);
551         if (MFX_ERR_NONE != mfxRes)
552         {
553             return mfxRes;
554         }
555         if (m_occupancyTable.size() <= occupancyIdx)
556         {
557             return MFX_ERR_UNDEFINED_BEHAVIOR;
558         }
559         pAssignment = &(m_occupancyTable[occupancyIdx]);
560 
561         // update the thread assignment parameters
562         if (MFX_TASK_INTRA & task.threadingPolicy)
563         {
564             // last entries in the dependency arrays must be empty
565             if ((m_pFreeTasks->param.task.pSrc[MFX_TASK_NUM_DEPENDENCIES - 1]) ||
566                 (m_pFreeTasks->param.task.pDst[MFX_TASK_NUM_DEPENDENCIES - 1]))
567             {
568                 return MFX_ERR_INVALID_VIDEO_PARAM;
569             }
570 
571             // fill INTRA task dependencies
572             m_pFreeTasks->param.task.pSrc[MFX_TASK_NUM_DEPENDENCIES - 1] = pAssignment->pLastTask;
573             m_pFreeTasks->param.task.pDst[MFX_TASK_NUM_DEPENDENCIES - 1] = m_pFreeTasks;
574             // update the last intra task pointer
575             pAssignment->pLastTask = m_pFreeTasks;
576         }
577         // do not save the pointer to thread assigment instance
578         // until all checking have been done
579         m_pFreeTasks->param.pThreadAssignment = pAssignment;
580         pAssignment->m_numRefs += 1;
581 
582         // saturate the number of available threads
583         uint32_t numThreads = m_pFreeTasks->param.task.entryPoint.requiredNumThreads;
584         numThreads = (0 == numThreads) ? m_param.numberOfThreads : numThreads;
585 
586         numThreads = std::min<uint32_t>({m_param.numberOfThreads, numThreads, sizeof(pAssignment->threadMask) * 8});
587         m_pFreeTasks->param.task.entryPoint.requiredNumThreads = numThreads;
588 
589         // set the advanced task's info
590         m_pFreeTasks->param.sourceInfo.pFileName = pFileName;
591         m_pFreeTasks->param.sourceInfo.lineNumber = lineNumber;
592         // set the sync point for the task
593         handle.handle = 0;
594         handle.taskID = m_pFreeTasks->taskID;
595         handle.jobID = m_pFreeTasks->jobID;
596         *pSyncPoint = (mfxSyncPoint) handle.handle;
597 
598         // Register task dependencies
599         RegisterTaskDependencies(m_pFreeTasks);
600 
601 
602         //
603         // move task to the corresponding task
604         //
605 
606         // remove the task from the 'free' queue
607         pTask = m_pFreeTasks;
608         m_pFreeTasks = m_pFreeTasks->pNext;
609         pTask->pNext = NULL;
610 
611         // find the end of the corresponding queue
612         type = (task.threadingPolicy & MFX_TASK_DEDICATED) ? (MFX_TYPE_HARDWARE) : (MFX_TYPE_SOFTWARE);
613         ppTemp = m_pTasks[task.priority] + type;
614         while (*ppTemp)
615         {
616             ppTemp = &((*ppTemp)->pNext);
617         }
618 
619         // add the task to the end of the corresponding queue
620         *ppTemp = pTask;
621 
622         // reset all 'waiting' tasks to prevent freezing
623         // so called 'permanent' tasks.
624         ResetWaitingTasks(pTask->param.task.pOwner);
625 
626         mfxU32 num_hw_threads = 0, num_sw_threads = 0;
627 
628         // increment the number of available tasks
629         if (MFX_TASK_DEDICATED & task.threadingPolicy) {
630             num_hw_threads = numThreads;
631         } else {
632             num_sw_threads = numThreads;
633         }
634 
635         // wake up working threads if task has resolved dependencies
636         if (IsReadyToRun(pTask)) {
637             WakeUpThreads(num_hw_threads, num_sw_threads);
638         }
639 
640         // leave the protected section
641     }
642 
643     return MFX_ERR_NONE;
644 
645 }
646 
DoWork()647 mfxStatus mfxSchedulerCore::DoWork()
648 {
649     return MFX_ERR_UNSUPPORTED;
650 } // mfxStatus mfxSchedulerCore::DoWork()
651 
652 
653 
654 
655