1 // Copyright (c) 2018-2020 Intel Corporation
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 //
10 // The above copyright notice and this permission notice shall be included in all
11 // copies or substantial portions of the Software.
12 //
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 // SOFTWARE.
20
21 #include <mfx_scheduler_core.h>
22
23 #include <mfx_scheduler_core_task.h>
24 #include <mfx_scheduler_core_handle.h>
25 #include <mfx_trace.h>
26
27 #include <vm_time.h>
28 #include <vm_sys_info.h>
29 #include <algorithm>
30
31
mfxSchedulerCore(void)32 mfxSchedulerCore::mfxSchedulerCore(void)
33 : m_currentTimeStamp(0)
34 // since on Linux we have blocking synchronization which means an absence of polling,
35 // there is no need to use 'waiting' time period.
36 , m_timeWaitPeriod(0)
37 , m_hwWakeUpThread()
38 , m_DedicatedThreadsToWakeUp(0)
39 , m_RegularThreadsToWakeUp(0)
40 {
41 memset(&m_param, 0, sizeof(m_param));
42 m_refCounter = 1;
43
44 memset(m_workingTime, 0, sizeof(m_workingTime));
45 m_timeIdx = 0;
46
47 m_bQuit = false;
48
49 m_pThreadCtx = NULL;
50 m_vmtick_msec_frequency = vm_time_get_frequency()/1000;
51
52 // reset task variables
53 memset(m_pTasks, 0, sizeof(m_pTasks));
54 memset(m_numAssignedTasks, 0, sizeof(m_numAssignedTasks));
55 m_pFailedTasks = NULL;
56
57 m_pFreeTasks = NULL;
58
59 // reset dependency table variables
60 m_numDependencies = 0;
61
62 // reset busy objects table
63 m_numOccupancies = 0;
64
65 // reset task counters
66 m_taskCounter = 0;
67 m_jobCounter = 0;
68
69 m_hwEventCounter = 0;
70
71 m_timer_hw_event = MFX_THREAD_TIME_TO_WAIT;
72
73 // set number of free tasks
74 m_freeTasksCount = MFX_MAX_NUMBER_TASK;
75
76 } // mfxSchedulerCore::mfxSchedulerCore(void)
77
~mfxSchedulerCore(void)78 mfxSchedulerCore::~mfxSchedulerCore(void)
79 {
80 Close();
81
82 } // mfxSchedulerCore::~mfxSchedulerCore(void)
83
SetScheduling(std::thread & handle)84 bool mfxSchedulerCore::SetScheduling(std::thread& handle)
85 {
86 (void)handle;
87 if (m_param.params.SchedulingType || m_param.params.Priority) {
88 if (handle.joinable()) {
89 struct sched_param param{};
90
91 param.sched_priority = m_param.params.Priority;
92 return !pthread_setschedparam(handle.native_handle(), m_param.params.SchedulingType, ¶m);
93 }
94 }
95 return true;
96 }
97
SetThreadsAffinityToSockets(void)98 void mfxSchedulerCore::SetThreadsAffinityToSockets(void)
99 {
100 }
101
Close(void)102 void mfxSchedulerCore::Close(void)
103 {
104 StopWakeUpThread();
105
106 // stop threads
107 if (m_pThreadCtx)
108 {
109 mfxU32 i;
110
111 // set the 'quit' flag for threads
112 m_bQuit = true;
113
114 {
115 // set the events to wake up sleeping threads
116 std::lock_guard<std::mutex> guard(m_guard);
117 WakeUpThreads();
118 }
119
120 for (i = 0; i < m_param.numberOfThreads; i += 1)
121 {
122 // wait for particular thread
123 if (m_pThreadCtx[i].threadHandle.joinable())
124 m_pThreadCtx[i].threadHandle.join();
125 }
126
127 delete[] m_pThreadCtx;
128 }
129
130 // run over the task lists and abort the existing tasks
131 ForEachTask(
132 [](MFX_SCHEDULER_TASK* task)
133 {
134 if (MFX_TASK_WORKING == task->curStatus)
135 {
136 task->CompleteTask(MFX_ERR_ABORTED);
137 }
138 }
139 );
140
141 // delete task objects
142 for (auto & it : m_ppTaskLookUpTable)
143 {
144 delete it;
145 it = nullptr;
146 }
147
148
149 memset(&m_param, 0, sizeof(m_param));
150
151 memset(m_workingTime, 0, sizeof(m_workingTime));
152 m_timeIdx = 0;
153
154 // reset variables
155 m_bQuit = false;
156 m_pThreadCtx = NULL;
157 // reset task variables
158 memset(m_pTasks, 0, sizeof(m_pTasks));
159 memset(m_numAssignedTasks, 0, sizeof(m_numAssignedTasks));
160 m_pFailedTasks = NULL;
161
162 m_pFreeTasks = NULL;
163
164 // reset dependency table variables
165 m_numDependencies = 0;
166
167 // reset busy objects table
168 m_numOccupancies = 0;
169
170 // reset task counters
171 m_taskCounter = 0;
172 m_jobCounter = 0;
173 }
174
WakeUpThreads(mfxU32 num_dedicated_threads,mfxU32 num_regular_threads)175 void mfxSchedulerCore::WakeUpThreads(mfxU32 num_dedicated_threads, mfxU32 num_regular_threads)
176 {
177 if (m_param.flags == MFX_SINGLE_THREAD)
178 return;
179
180 MFX_SCHEDULER_THREAD_CONTEXT* thctx;
181
182 if (num_dedicated_threads) {
183 // we have single dedicated thread, thus no loop here
184 thctx = GetThreadCtx(0);
185 if (thctx->state == MFX_SCHEDULER_THREAD_CONTEXT::Waiting) {
186 thctx->taskAdded.notify_one();
187 }
188 }
189 // if we have woken up dedicated thread, we exclude it from the loop below
190 for (mfxU32 i = (num_dedicated_threads)? 1: 0; (i < m_param.numberOfThreads) && num_regular_threads; ++i) {
191 thctx = GetThreadCtx(i);
192 if (thctx->state == MFX_SCHEDULER_THREAD_CONTEXT::Waiting) {
193 thctx->taskAdded.notify_one();
194 --num_regular_threads;
195 }
196 }
197 }
198
Wait(const mfxU32 curThreadNum,std::unique_lock<std::mutex> & mutex)199 void mfxSchedulerCore::Wait(const mfxU32 curThreadNum, std::unique_lock<std::mutex>& mutex)
200 {
201 MFX_SCHEDULER_THREAD_CONTEXT* thctx = GetThreadCtx(curThreadNum);
202
203 if (thctx) {
204 thctx->taskAdded.wait(mutex);
205 }
206 }
207
GetHighPerformanceCounter(void)208 mfxU64 mfxSchedulerCore::GetHighPerformanceCounter(void)
209 {
210 return (mfxU64) vm_time_get_tick();
211
212 } // mfxU64 mfxSchedulerCore::GetHighPerformanceCounter(void)
213
GetLowResCurrentTime(void)214 mfxU32 mfxSchedulerCore::GetLowResCurrentTime(void)
215 {
216 return vm_time_get_current_time();
217
218 } // mfxU32 mfxSchedulerCore::GetCurrentTime(void)
AllocateEmptyTask(void)219 mfxStatus mfxSchedulerCore::AllocateEmptyTask(void)
220 {
221 //
222 // THE EXECUTION IS ALREADY IN SECURE SECTION.
223 // Just do what need to do.
224 //
225
226 // Clean up task queues
227 ScrubCompletedTasks();
228
229 // allocate one new task
230 if (nullptr == m_pFreeTasks)
231 {
232
233
234 // the maximum allowed number of tasks is reached
235 if (MFX_MAX_NUMBER_TASK <= m_taskCounter)
236 {
237 return MFX_WRN_DEVICE_BUSY;
238 }
239
240 // allocate one more task
241 try
242 {
243 m_pFreeTasks = new MFX_SCHEDULER_TASK(m_taskCounter++, this);
244 }
245 catch(...)
246 {
247 return MFX_WRN_DEVICE_BUSY;
248 }
249 // register the task in the look up table
250 m_ppTaskLookUpTable[m_pFreeTasks->taskID] = m_pFreeTasks;
251 }
252 memset(&(m_pFreeTasks->param), 0, sizeof(m_pFreeTasks->param));
253 // increment job number. This number must grow evenly.
254 // make job number 0 an invalid value to avoid problem with
255 // task number 0 with job number 0, which are NULL when being combined.
256 m_jobCounter += 1;
257 if (MFX_MAX_NUMBER_JOB <= m_jobCounter)
258 {
259 m_jobCounter = 1;
260 }
261 m_pFreeTasks->jobID = m_jobCounter;
262
263 return MFX_ERR_NONE;
264
265 } // mfxStatus mfxSchedulerCore::AllocateEmptyTask(void)
266
GetOccupancyTableIndex(mfxU32 & idx,const MFX_TASK * pTask)267 mfxStatus mfxSchedulerCore::GetOccupancyTableIndex(mfxU32 &idx,
268 const MFX_TASK *pTask)
269 {
270 mfxU32 i = 0;
271 MFX_THREAD_ASSIGNMENT *pAssignment = NULL;
272
273 //
274 // THE EXECUTION IS ALREADY IN SECURE SECTION.
275 // Just do what need to do.
276 //
277
278 // check the table, decrement the number of used entries
279 while ((m_numOccupancies) &&
280 (0 == m_occupancyTable[m_numOccupancies - 1].m_numRefs))
281 {
282 m_numOccupancies -= 1;
283 }
284
285 // find the existing element with the given pState and pRoutine
286 for (i = 0; i < m_numOccupancies; i += 1)
287 {
288 if ((m_occupancyTable[i].pState == pTask->entryPoint.pState) &&
289 (m_occupancyTable[i].pRoutine == pTask->entryPoint.pRoutine))
290 {
291 // check the type of other tasks using this table entry
292 if (m_occupancyTable[i].threadingPolicy != pTask->threadingPolicy)
293 {
294 return MFX_ERR_INVALID_VIDEO_PARAM;
295 }
296
297 pAssignment = &(m_occupancyTable[i]);
298 break;
299 }
300 }
301
302 // if the element exist, check the parameters for compatibility
303 if (pAssignment)
304 {
305 // actually, there is nothing to do
306 }
307 // allocate one more element in the array
308 else
309 {
310 for (i = 0; i < m_numOccupancies; i += 1)
311 {
312 if (0 == m_occupancyTable[i].m_numRefs)
313 {
314 break;
315 }
316 }
317 // we can't reallocate the table
318 if (m_occupancyTable.size() == i)
319 {
320 return MFX_WRN_DEVICE_BUSY;
321 }
322
323 pAssignment = &(m_occupancyTable[i]);
324
325 // fill the parameters
326 memset(pAssignment, 0, sizeof(MFX_THREAD_ASSIGNMENT));
327 pAssignment->pState = pTask->entryPoint.pState;
328 pAssignment->pRoutine = pTask->entryPoint.pRoutine;
329 pAssignment->threadingPolicy = pTask->threadingPolicy;
330 }
331
332 // update the number of allocated objects
333 m_numOccupancies = std::max(mfxU32(m_numOccupancies), i + 1);
334
335 // save the index to return
336 idx = i;
337
338 return MFX_ERR_NONE;
339
340 } // mfxStatus mfxSchedulerCore::GetOccupancyTableIndex(mfxU32 &idx,
341
ScrubCompletedTasks(bool bComprehensive)342 void mfxSchedulerCore::ScrubCompletedTasks(bool bComprehensive)
343 {
344 int priority;
345
346 //
347 // THE EXECUTION IS ALREADY IN SECURE SECTION.
348 // Just do what need to do.
349 //
350
351 for (priority = MFX_PRIORITY_HIGH;
352 priority >= MFX_PRIORITY_LOW;
353 priority -= 1)
354 {
355 int type;
356
357 for (type = MFX_TYPE_HARDWARE; type <= MFX_TYPE_SOFTWARE; type += 1)
358 {
359 MFX_SCHEDULER_TASK **ppCur;
360
361 // if there is an empty task, immediately return
362 if ((false == bComprehensive) &&
363 (m_pFreeTasks))
364 {
365 return;
366 }
367
368 ppCur = m_pTasks[priority] + type;
369 while (*ppCur)
370 {
371 // move task completed to the 'free' queue.
372 if (MFX_ERR_NONE == (*ppCur)->opRes)
373 {
374 MFX_SCHEDULER_TASK *pTemp;
375
376 // cut the task from the queue
377 pTemp = *ppCur;
378 *ppCur = pTemp->pNext;
379 // add it to the 'free' queue
380 pTemp->pNext = m_pFreeTasks;
381 m_pFreeTasks = pTemp;
382 }
383 // move task failed to the 'failed' queue.
384 else if ((MFX_ERR_NONE != (*ppCur)->opRes) &&
385 (MFX_WRN_IN_EXECUTION != (*ppCur)->opRes))
386 {
387 MFX_SCHEDULER_TASK *pTemp;
388
389 // cut the task from the queue
390 pTemp = *ppCur;
391 *ppCur = pTemp->pNext;
392 // add it to the 'failed' queue
393 pTemp->pNext = m_pFailedTasks;
394 m_pFailedTasks = pTemp;
395 }
396 else
397 {
398 // set the next task
399 ppCur = &((*ppCur)->pNext);
400 }
401 }
402 }
403 }
404
405 } // void mfxSchedulerCore::ScrubCompletedTasks(bool bComprehensive)
406
RegisterTaskDependencies(MFX_SCHEDULER_TASK * pTask)407 void mfxSchedulerCore::RegisterTaskDependencies(MFX_SCHEDULER_TASK *pTask)
408 {
409 mfxU32 i, tableIdx, remainInputs;
410 const void *pSrcCopy[MFX_TASK_NUM_DEPENDENCIES];
411 mfxStatus taskRes = MFX_WRN_IN_EXECUTION;
412
413 //
414 // THE EXECUTION IS ALREADY IN SECURE SECTION.
415 // Just do what need to do.
416 //
417
418 // check if the table have empty position(s),
419 // If so decrement the index of the last table entry.
420 if (m_pDependencyTable.size() > m_numDependencies)
421 {
422 auto it = std::find_if(m_pDependencyTable.rend() - m_numDependencies, m_pDependencyTable.rend(),
423 [](const MFX_DEPENDENCY_ITEM & item){ return item.p != nullptr; });
424
425 m_numDependencies = m_pDependencyTable.rend() - it;
426 }
427
428 // get the number of source dependencies
429 remainInputs = 0;
430 for (i = 0; i < MFX_TASK_NUM_DEPENDENCIES; i += 1)
431 {
432 // make a copy of source dependencies.
433 // source dependencies have to be swept, because of duplication in
434 // the dependency table. task will sync on the first matching entry.
435 pSrcCopy[i] = pTask->param.task.pSrc[i];
436 if (pSrcCopy[i])
437 {
438 remainInputs += 1;
439 }
440 }
441
442 // run over the table and save the handles of incomplete inputs
443 for (tableIdx = 0; tableIdx < m_numDependencies; tableIdx += 1)
444 {
445 // compare only filled table entries
446 if (m_pDependencyTable[tableIdx].p)
447 {
448 for (i = 0; i < MFX_TASK_NUM_DEPENDENCIES; i += 1)
449 {
450 // we found the source dependency,
451 // save the handle
452 if (m_pDependencyTable[tableIdx].p == pSrcCopy[i])
453 {
454 // dependency is fail. The dependency resolved, but failed.
455 if (MFX_WRN_IN_EXECUTION != m_pDependencyTable[tableIdx].mfxRes)
456 {
457 // waiting task inherits status from the parent task
458 // need to propogate error status to all dependent tasks.
459 //if (MFX_TASK_WAIT & pTask->param.task.threadingPolicy)
460 {
461 taskRes = m_pDependencyTable[tableIdx].mfxRes;
462 }
463 //// all other tasks are aborted
464 //else
465 //{
466 // taskRes = MFX_ERR_ABORTED;
467 //}
468 }
469 // link dependency
470 else
471 {
472 m_pDependencyTable[tableIdx].pTask->SetDependentItem(pTask, i);
473 }
474 // sweep already used dependency
475 pSrcCopy[i] = NULL;
476 remainInputs -= 1;
477 break;
478 }
479 }
480
481 // is there more source dependencies?
482 if (0 == remainInputs)
483 {
484 break;
485 }
486 }
487 }
488
489 // run over the table and register generated outputs
490 tableIdx = 0;
491 for (i = 0; i < MFX_TASK_NUM_DEPENDENCIES; i += 1)
492 {
493 if (pTask->param.task.pDst[i])
494 {
495 // find empty table entry
496 while (m_pDependencyTable.at(tableIdx).p)
497 {
498 tableIdx += 1;
499 }
500
501 // save the generated dependency
502 m_pDependencyTable[tableIdx].p = pTask->param.task.pDst[i];
503 m_pDependencyTable[tableIdx].mfxRes = taskRes;
504 m_pDependencyTable[tableIdx].pTask = pTask;
505
506 // save the index of the output
507 pTask->param.dependencies.dstIdx[i] = tableIdx;
508 tableIdx += 1;
509 }
510 }
511
512 // update the dependency table max index
513 if (tableIdx >= m_numDependencies)
514 {
515 m_numDependencies = tableIdx;
516 }
517
518 // if dependency were failed,
519 // set the task into the 'aborted' state
520 if (MFX_WRN_IN_EXECUTION != taskRes)
521 {
522 // save the status
523 m_pFreeTasks->curStatus = taskRes;
524 m_pFreeTasks->opRes = taskRes;
525 m_pFreeTasks->done.notify_all();
526 }
527
528 } // void mfxSchedulerCore::RegisterTaskDependencies(MFX_SCHEDULER_TASK *pTask)
529
530
531
532 //#define ENABLE_TASK_DEBUG
533
534
PrintTaskInfo(void)535 void mfxSchedulerCore::PrintTaskInfo(void)
536 {
537
538 } // void mfxSchedulerCore::PrintTaskInfo(void)
539
PrintTaskInfoUnsafe(void)540 void mfxSchedulerCore::PrintTaskInfoUnsafe(void)
541 {
542
543 } // void mfxSchedulerCore::PrintTaskInfoUnsafe(void)
544