1 // Copyright (c) 2018-2020 Intel Corporation
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 //
10 // The above copyright notice and this permission notice shall be included in all
11 // copies or substantial portions of the Software.
12 //
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 // SOFTWARE.
20 
21 #include <mfx_scheduler_core.h>
22 
23 #include <mfx_scheduler_core_task.h>
24 #include <mfx_scheduler_core_handle.h>
25 #include <mfx_trace.h>
26 
27 #include <vm_time.h>
28 #include <vm_sys_info.h>
29 #include <algorithm>
30 
31 
mfxSchedulerCore(void)32 mfxSchedulerCore::mfxSchedulerCore(void)
33     :  m_currentTimeStamp(0)
34     // since on Linux we have blocking synchronization which means an absence of polling,
35     // there is no need to use 'waiting' time period.
36     , m_timeWaitPeriod(0)
37     , m_hwWakeUpThread()
38     , m_DedicatedThreadsToWakeUp(0)
39     , m_RegularThreadsToWakeUp(0)
40 {
41     memset(&m_param, 0, sizeof(m_param));
42     m_refCounter = 1;
43 
44     memset(m_workingTime, 0, sizeof(m_workingTime));
45     m_timeIdx = 0;
46 
47     m_bQuit = false;
48 
49     m_pThreadCtx = NULL;
50     m_vmtick_msec_frequency = vm_time_get_frequency()/1000;
51 
52     // reset task variables
53     memset(m_pTasks, 0, sizeof(m_pTasks));
54     memset(m_numAssignedTasks, 0, sizeof(m_numAssignedTasks));
55     m_pFailedTasks = NULL;
56 
57     m_pFreeTasks = NULL;
58 
59     // reset dependency table variables
60     m_numDependencies = 0;
61 
62     // reset busy objects table
63     m_numOccupancies = 0;
64 
65     // reset task counters
66     m_taskCounter = 0;
67     m_jobCounter = 0;
68 
69     m_hwEventCounter = 0;
70 
71     m_timer_hw_event = MFX_THREAD_TIME_TO_WAIT;
72 
73     // set number of free tasks
74     m_freeTasksCount = MFX_MAX_NUMBER_TASK;
75 
76 } // mfxSchedulerCore::mfxSchedulerCore(void)
77 
~mfxSchedulerCore(void)78 mfxSchedulerCore::~mfxSchedulerCore(void)
79 {
80     Close();
81 
82 } // mfxSchedulerCore::~mfxSchedulerCore(void)
83 
SetScheduling(std::thread & handle)84 bool mfxSchedulerCore::SetScheduling(std::thread& handle)
85 {
86     (void)handle;
87     if (m_param.params.SchedulingType || m_param.params.Priority) {
88         if (handle.joinable()) {
89             struct sched_param param{};
90 
91             param.sched_priority = m_param.params.Priority;
92             return !pthread_setschedparam(handle.native_handle(), m_param.params.SchedulingType, &param);
93         }
94     }
95     return true;
96 }
97 
SetThreadsAffinityToSockets(void)98 void mfxSchedulerCore::SetThreadsAffinityToSockets(void)
99 {
100 }
101 
Close(void)102 void mfxSchedulerCore::Close(void)
103 {
104     StopWakeUpThread();
105 
106     // stop threads
107     if (m_pThreadCtx)
108     {
109         mfxU32 i;
110 
111         // set the 'quit' flag for threads
112         m_bQuit = true;
113 
114         {
115             // set the events to wake up sleeping threads
116             std::lock_guard<std::mutex> guard(m_guard);
117             WakeUpThreads();
118         }
119 
120         for (i = 0; i < m_param.numberOfThreads; i += 1)
121         {
122             // wait for particular thread
123             if (m_pThreadCtx[i].threadHandle.joinable())
124                 m_pThreadCtx[i].threadHandle.join();
125         }
126 
127         delete[] m_pThreadCtx;
128     }
129 
130     // run over the task lists and abort the existing tasks
131     ForEachTask(
132         [](MFX_SCHEDULER_TASK* task)
133         {
134             if (MFX_TASK_WORKING == task->curStatus)
135             {
136                 task->CompleteTask(MFX_ERR_ABORTED);
137             }
138         }
139     );
140 
141     // delete task objects
142     for (auto & it : m_ppTaskLookUpTable)
143     {
144         delete it;
145         it = nullptr;
146     }
147 
148 
149     memset(&m_param, 0, sizeof(m_param));
150 
151     memset(m_workingTime, 0, sizeof(m_workingTime));
152     m_timeIdx = 0;
153 
154     // reset variables
155     m_bQuit = false;
156     m_pThreadCtx = NULL;
157     // reset task variables
158     memset(m_pTasks, 0, sizeof(m_pTasks));
159     memset(m_numAssignedTasks, 0, sizeof(m_numAssignedTasks));
160     m_pFailedTasks = NULL;
161 
162     m_pFreeTasks = NULL;
163 
164     // reset dependency table variables
165     m_numDependencies = 0;
166 
167     // reset busy objects table
168     m_numOccupancies = 0;
169 
170     // reset task counters
171     m_taskCounter = 0;
172     m_jobCounter = 0;
173 }
174 
WakeUpThreads(mfxU32 num_dedicated_threads,mfxU32 num_regular_threads)175 void mfxSchedulerCore::WakeUpThreads(mfxU32 num_dedicated_threads, mfxU32 num_regular_threads)
176 {
177     if (m_param.flags == MFX_SINGLE_THREAD)
178         return;
179 
180     MFX_SCHEDULER_THREAD_CONTEXT* thctx;
181 
182     if (num_dedicated_threads) {
183         // we have single dedicated thread, thus no loop here
184         thctx = GetThreadCtx(0);
185         if (thctx->state == MFX_SCHEDULER_THREAD_CONTEXT::Waiting) {
186             thctx->taskAdded.notify_one();
187         }
188     }
189     // if we have woken up dedicated thread, we exclude it from the loop below
190     for (mfxU32 i = (num_dedicated_threads)? 1: 0; (i < m_param.numberOfThreads) && num_regular_threads; ++i) {
191         thctx = GetThreadCtx(i);
192         if (thctx->state == MFX_SCHEDULER_THREAD_CONTEXT::Waiting) {
193             thctx->taskAdded.notify_one();
194             --num_regular_threads;
195         }
196     }
197 }
198 
Wait(const mfxU32 curThreadNum,std::unique_lock<std::mutex> & mutex)199 void mfxSchedulerCore::Wait(const mfxU32 curThreadNum, std::unique_lock<std::mutex>& mutex)
200 {
201     MFX_SCHEDULER_THREAD_CONTEXT* thctx = GetThreadCtx(curThreadNum);
202 
203     if (thctx) {
204         thctx->taskAdded.wait(mutex);
205     }
206 }
207 
GetHighPerformanceCounter(void)208 mfxU64 mfxSchedulerCore::GetHighPerformanceCounter(void)
209 {
210     return (mfxU64) vm_time_get_tick();
211 
212 } // mfxU64 mfxSchedulerCore::GetHighPerformanceCounter(void)
213 
GetLowResCurrentTime(void)214 mfxU32 mfxSchedulerCore::GetLowResCurrentTime(void)
215 {
216     return vm_time_get_current_time();
217 
218 } // mfxU32 mfxSchedulerCore::GetCurrentTime(void)
AllocateEmptyTask(void)219 mfxStatus mfxSchedulerCore::AllocateEmptyTask(void)
220 {
221     //
222     // THE EXECUTION IS ALREADY IN SECURE SECTION.
223     // Just do what need to do.
224     //
225 
226     // Clean up task queues
227     ScrubCompletedTasks();
228 
229     // allocate one new task
230     if (nullptr == m_pFreeTasks)
231     {
232 
233 
234         // the maximum allowed number of tasks is reached
235         if (MFX_MAX_NUMBER_TASK <= m_taskCounter)
236         {
237             return MFX_WRN_DEVICE_BUSY;
238         }
239 
240         // allocate one more task
241         try
242         {
243             m_pFreeTasks = new MFX_SCHEDULER_TASK(m_taskCounter++, this);
244         }
245         catch(...)
246         {
247             return MFX_WRN_DEVICE_BUSY;
248         }
249         // register the task in the look up table
250         m_ppTaskLookUpTable[m_pFreeTasks->taskID] = m_pFreeTasks;
251     }
252     memset(&(m_pFreeTasks->param), 0, sizeof(m_pFreeTasks->param));
253     // increment job number. This number must grow evenly.
254     // make job number 0 an invalid value to avoid problem with
255     // task number 0 with job number 0, which are NULL when being combined.
256     m_jobCounter += 1;
257     if (MFX_MAX_NUMBER_JOB <= m_jobCounter)
258     {
259         m_jobCounter = 1;
260     }
261     m_pFreeTasks->jobID = m_jobCounter;
262 
263     return MFX_ERR_NONE;
264 
265 } // mfxStatus mfxSchedulerCore::AllocateEmptyTask(void)
266 
GetOccupancyTableIndex(mfxU32 & idx,const MFX_TASK * pTask)267 mfxStatus mfxSchedulerCore::GetOccupancyTableIndex(mfxU32 &idx,
268                                                    const MFX_TASK *pTask)
269 {
270     mfxU32 i = 0;
271     MFX_THREAD_ASSIGNMENT *pAssignment = NULL;
272 
273     //
274     // THE EXECUTION IS ALREADY IN SECURE SECTION.
275     // Just do what need to do.
276     //
277 
278     // check the table, decrement the number of used entries
279     while ((m_numOccupancies) &&
280            (0 == m_occupancyTable[m_numOccupancies - 1].m_numRefs))
281     {
282         m_numOccupancies -= 1;
283     }
284 
285     // find the existing element with the given pState and pRoutine
286     for (i = 0; i < m_numOccupancies; i += 1)
287     {
288         if ((m_occupancyTable[i].pState == pTask->entryPoint.pState) &&
289             (m_occupancyTable[i].pRoutine == pTask->entryPoint.pRoutine))
290         {
291             // check the type of other tasks using this table entry
292             if (m_occupancyTable[i].threadingPolicy != pTask->threadingPolicy)
293             {
294                 return MFX_ERR_INVALID_VIDEO_PARAM;
295             }
296 
297             pAssignment = &(m_occupancyTable[i]);
298             break;
299         }
300     }
301 
302     // if the element exist, check the parameters for compatibility
303     if (pAssignment)
304     {
305         // actually, there is nothing to do
306     }
307     // allocate one more element in the array
308     else
309     {
310         for (i = 0; i < m_numOccupancies; i += 1)
311         {
312             if (0 == m_occupancyTable[i].m_numRefs)
313             {
314                 break;
315             }
316         }
317         // we can't reallocate the table
318         if (m_occupancyTable.size() == i)
319         {
320             return MFX_WRN_DEVICE_BUSY;
321         }
322 
323         pAssignment = &(m_occupancyTable[i]);
324 
325         // fill the parameters
326         memset(pAssignment, 0, sizeof(MFX_THREAD_ASSIGNMENT));
327         pAssignment->pState = pTask->entryPoint.pState;
328         pAssignment->pRoutine = pTask->entryPoint.pRoutine;
329         pAssignment->threadingPolicy = pTask->threadingPolicy;
330     }
331 
332     // update the number of allocated objects
333     m_numOccupancies = std::max(mfxU32(m_numOccupancies), i + 1);
334 
335     // save the index to return
336     idx = i;
337 
338     return MFX_ERR_NONE;
339 
340 } // mfxStatus mfxSchedulerCore::GetOccupancyTableIndex(mfxU32 &idx,
341 
ScrubCompletedTasks(bool bComprehensive)342 void mfxSchedulerCore::ScrubCompletedTasks(bool bComprehensive)
343 {
344     int priority;
345 
346     //
347     // THE EXECUTION IS ALREADY IN SECURE SECTION.
348     // Just do what need to do.
349     //
350 
351     for (priority = MFX_PRIORITY_HIGH;
352          priority >= MFX_PRIORITY_LOW;
353          priority -= 1)
354     {
355         int type;
356 
357         for (type = MFX_TYPE_HARDWARE; type <= MFX_TYPE_SOFTWARE; type += 1)
358         {
359             MFX_SCHEDULER_TASK **ppCur;
360 
361             // if there is an empty task, immediately return
362             if ((false == bComprehensive) &&
363                 (m_pFreeTasks))
364             {
365                 return;
366             }
367 
368             ppCur = m_pTasks[priority] + type;
369             while (*ppCur)
370             {
371                 // move task completed to the 'free' queue.
372                 if (MFX_ERR_NONE == (*ppCur)->opRes)
373                 {
374                     MFX_SCHEDULER_TASK *pTemp;
375 
376                     // cut the task from the queue
377                     pTemp = *ppCur;
378                     *ppCur = pTemp->pNext;
379                     // add it to the 'free' queue
380                     pTemp->pNext = m_pFreeTasks;
381                     m_pFreeTasks = pTemp;
382                 }
383                 // move task failed to the 'failed' queue.
384                 else if ((MFX_ERR_NONE != (*ppCur)->opRes) &&
385                          (MFX_WRN_IN_EXECUTION != (*ppCur)->opRes))
386                 {
387                     MFX_SCHEDULER_TASK *pTemp;
388 
389                     // cut the task from the queue
390                     pTemp = *ppCur;
391                     *ppCur = pTemp->pNext;
392                     // add it to the 'failed' queue
393                     pTemp->pNext = m_pFailedTasks;
394                     m_pFailedTasks = pTemp;
395                 }
396                 else
397                 {
398                     // set the next task
399                     ppCur = &((*ppCur)->pNext);
400                 }
401             }
402         }
403     }
404 
405 } // void mfxSchedulerCore::ScrubCompletedTasks(bool bComprehensive)
406 
RegisterTaskDependencies(MFX_SCHEDULER_TASK * pTask)407 void mfxSchedulerCore::RegisterTaskDependencies(MFX_SCHEDULER_TASK  *pTask)
408 {
409     mfxU32 i, tableIdx, remainInputs;
410     const void *pSrcCopy[MFX_TASK_NUM_DEPENDENCIES];
411     mfxStatus taskRes = MFX_WRN_IN_EXECUTION;
412 
413     //
414     // THE EXECUTION IS ALREADY IN SECURE SECTION.
415     // Just do what need to do.
416     //
417 
418     // check if the table have empty position(s),
419     // If so decrement the index of the last table entry.
420     if (m_pDependencyTable.size() > m_numDependencies)
421     {
422         auto it = std::find_if(m_pDependencyTable.rend() - m_numDependencies, m_pDependencyTable.rend(),
423                                [](const MFX_DEPENDENCY_ITEM & item){ return item.p != nullptr; });
424 
425         m_numDependencies = m_pDependencyTable.rend() - it;
426     }
427 
428     // get the number of source dependencies
429     remainInputs = 0;
430     for (i = 0; i < MFX_TASK_NUM_DEPENDENCIES; i += 1)
431     {
432         // make a copy of source dependencies.
433         // source dependencies have to be swept, because of duplication in
434         // the dependency table. task will sync on the first matching entry.
435         pSrcCopy[i] = pTask->param.task.pSrc[i];
436         if (pSrcCopy[i])
437         {
438             remainInputs += 1;
439         }
440     }
441 
442     // run over the table and save the handles of incomplete inputs
443     for (tableIdx = 0; tableIdx < m_numDependencies; tableIdx += 1)
444     {
445         // compare only filled table entries
446         if (m_pDependencyTable[tableIdx].p)
447         {
448             for (i = 0; i < MFX_TASK_NUM_DEPENDENCIES; i += 1)
449             {
450                 // we found the source dependency,
451                 // save the handle
452                 if (m_pDependencyTable[tableIdx].p == pSrcCopy[i])
453                 {
454                     // dependency is fail. The dependency resolved, but failed.
455                     if (MFX_WRN_IN_EXECUTION != m_pDependencyTable[tableIdx].mfxRes)
456                     {
457                         // waiting task inherits status from the parent task
458                         // need to propogate error status to all dependent tasks.
459                         //if (MFX_TASK_WAIT & pTask->param.task.threadingPolicy)
460                         {
461                             taskRes = m_pDependencyTable[tableIdx].mfxRes;
462                         }
463                         //// all other tasks are aborted
464                         //else
465                         //{
466                         //    taskRes = MFX_ERR_ABORTED;
467                         //}
468                     }
469                     // link dependency
470                     else
471                     {
472                         m_pDependencyTable[tableIdx].pTask->SetDependentItem(pTask, i);
473                     }
474                     // sweep already used dependency
475                     pSrcCopy[i] = NULL;
476                     remainInputs -= 1;
477                     break;
478                 }
479             }
480 
481             // is there more source dependencies?
482             if (0 == remainInputs)
483             {
484                 break;
485             }
486         }
487     }
488 
489     // run over the table and register generated outputs
490     tableIdx = 0;
491     for (i = 0; i < MFX_TASK_NUM_DEPENDENCIES; i += 1)
492     {
493         if (pTask->param.task.pDst[i])
494         {
495             // find empty table entry
496             while (m_pDependencyTable.at(tableIdx).p)
497             {
498                 tableIdx += 1;
499             }
500 
501             // save the generated dependency
502             m_pDependencyTable[tableIdx].p = pTask->param.task.pDst[i];
503             m_pDependencyTable[tableIdx].mfxRes = taskRes;
504             m_pDependencyTable[tableIdx].pTask = pTask;
505 
506             // save the index of the output
507             pTask->param.dependencies.dstIdx[i] = tableIdx;
508             tableIdx += 1;
509         }
510     }
511 
512     // update the dependency table max index
513     if (tableIdx >= m_numDependencies)
514     {
515         m_numDependencies = tableIdx;
516     }
517 
518     // if dependency were failed,
519     // set the task into the 'aborted' state
520     if (MFX_WRN_IN_EXECUTION != taskRes)
521     {
522         // save the status
523         m_pFreeTasks->curStatus = taskRes;
524         m_pFreeTasks->opRes = taskRes;
525         m_pFreeTasks->done.notify_all();
526     }
527 
528 } // void mfxSchedulerCore::RegisterTaskDependencies(MFX_SCHEDULER_TASK  *pTask)
529 
530 
531 
532 //#define ENABLE_TASK_DEBUG
533 
534 
PrintTaskInfo(void)535 void mfxSchedulerCore::PrintTaskInfo(void)
536 {
537 
538 } // void mfxSchedulerCore::PrintTaskInfo(void)
539 
PrintTaskInfoUnsafe(void)540 void mfxSchedulerCore::PrintTaskInfoUnsafe(void)
541 {
542 
543 } // void mfxSchedulerCore::PrintTaskInfoUnsafe(void)
544