1 // Copyright (c) 2013 Doug Binks
2 //
3 // This software is provided 'as-is', without any express or implied
4 // warranty. In no event will the authors be held liable for any damages
5 // arising from the use of this software.
6 //
7 // Permission is granted to anyone to use this software for any purpose,
8 // including commercial applications, and to alter it and redistribute it
9 // freely, subject to the following restrictions:
10 //
11 // 1. The origin of this software must not be misrepresented; you must not
12 //    claim that you wrote the original software. If you use this software
13 //    in a product, an acknowledgement in the product documentation would be
14 //    appreciated but is not required.
15 // 2. Altered source versions must be plainly marked as such, and must not be
16 //    misrepresented as being the original software.
17 // 3. This notice may not be removed or altered from any source distribution.
18 
19 #include <assert.h>
20 
21 #include "TaskScheduler.h"
22 #include "LockLessMultiReadPipe.h"
23 
24 #if defined __i386__ || defined __x86_64__
25 #include "x86intrin.h"
26 #elif defined _WIN32
27 #include <intrin.h>
28 #endif
29 
30 using namespace enki;
31 
32 
33 static const uint32_t PIPESIZE_LOG2              = 8;
34 static const uint32_t SPIN_COUNT                 = 100;
35 static const uint32_t SPIN_BACKOFF_MULTIPLIER    = 10;
36 static const uint32_t MAX_NUM_INITIAL_PARTITIONS = 8;
37 
38 // each software thread gets it's own copy of gtl_threadNum, so this is safe to use as a static variable
39 static THREAD_LOCAL uint32_t                             gtl_threadNum       = 0;
40 
41 namespace enki
42 {
43     struct SubTaskSet
44     {
45         ITaskSet*           pTask;
46         TaskSetPartition    partition;
47     };
48 
49     // we derive class TaskPipe rather than typedef to get forward declaration working easily
50     class TaskPipe : public LockLessMultiReadPipe<PIPESIZE_LOG2,enki::SubTaskSet> {};
51 
52     struct ThreadArgs
53     {
54         uint32_t        threadNum;
55         TaskScheduler*  pTaskScheduler;
56     };
57 
58     class PinnedTaskList : public LocklessMultiWriteIntrusiveList<IPinnedTask> {};
59 }
60 
61 namespace
62 {
SplitTask(SubTaskSet & subTask_,uint32_t rangeToSplit_)63     SubTaskSet       SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ )
64     {
65         SubTaskSet splitTask = subTask_;
66         uint32_t rangeLeft = subTask_.partition.end - subTask_.partition.start;
67 
68         if( rangeToSplit_ > rangeLeft )
69         {
70             rangeToSplit_ = rangeLeft;
71         }
72         splitTask.partition.end = subTask_.partition.start + rangeToSplit_;
73         subTask_.partition.start = splitTask.partition.end;
74         return splitTask;
75     }
76 
77     #if ( defined _WIN32 && ( defined _M_IX86  || defined _M_X64 ) ) || ( defined __i386__ || defined __x86_64__ )
SpinWait(uint32_t spinCount_)78     static void SpinWait( uint32_t spinCount_ )
79     {
80         uint64_t end = __rdtsc() + spinCount_;
81         while( __rdtsc() < end )
82         {
83             _mm_pause();
84         }
85     }
86     #else
SpinWait(uint32_t spinCount_)87     static void SpinWait( uint32_t spinCount_ )
88     {
89         while( spinCount_ )
90         {
91             // TODO: may have NOP or yield equiv
92             --spinCount_;
93         }
94     }
95     #endif
96 
97 
98 }
99 
100 
SafeCallback(ProfilerCallbackFunc func_,uint32_t threadnum_)101 static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_)
102 {
103     if( func_ )
104     {
105         func_(threadnum_);
106     }
107 }
108 
GetProfilerCallbacks()109 ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks()
110 {
111     return &m_ProfilerCallbacks;
112 }
113 
TaskingThreadFunction(void * pArgs)114 THREADFUNC_DECL TaskScheduler::TaskingThreadFunction( void* pArgs )
115 {
116     ThreadArgs args                    = *(ThreadArgs*)pArgs;
117     uint32_t threadNum                = args.threadNum;
118     TaskScheduler*  pTS                = args.pTaskScheduler;
119     gtl_threadNum      = threadNum;
120 
121     SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum );
122 
123     uint32_t spinCount = SPIN_COUNT + 1;
124     uint32_t hintPipeToCheck_io = threadNum + 1;    // does not need to be clamped.
125     while( pTS->m_bRunning )
126     {
127         if(!pTS->TryRunTask( threadNum, hintPipeToCheck_io ) )
128         {
129             // no tasks, will spin then wait
130             ++spinCount;
131             if( spinCount > SPIN_COUNT )
132             {
133                 pTS->WaitForTasks( threadNum );
134                 spinCount = 0;
135             }
136             else
137             {
138                 // Note: see https://software.intel.com/en-us/articles/a-common-construct-to-avoid-the-contention-of-threads-architecture-agnostic-spin-wait-loops
139                 uint32_t spinBackoffCount = spinCount * SPIN_BACKOFF_MULTIPLIER;
140                 SpinWait( spinBackoffCount );
141             }
142         }
143         else
144         {
145             spinCount = 0;
146         }
147     }
148 
149     AtomicAdd( &pTS->m_NumThreadsRunning, -1 );
150     SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum );
151 
152     return 0;
153 }
154 
155 
StartThreads()156 void TaskScheduler::StartThreads()
157 {
158     if( m_bHaveThreads )
159     {
160         return;
161     }
162     m_bRunning = true;
163 
164     SemaphoreCreate( m_NewTaskSemaphore );
165 
166     // we create one less thread than m_NumThreads as the main thread counts as one
167     m_pThreadArgStore = new ThreadArgs[m_NumThreads];
168     m_pThreadIDs      = new threadid_t[m_NumThreads];
169     m_pThreadArgStore[0].threadNum      = 0;
170     m_pThreadArgStore[0].pTaskScheduler = this;
171     m_pThreadIDs[0] = 0;
172     m_NumThreadsWaiting = 0;
173     m_NumThreadsRunning = 1;// acount for main thread
174     for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
175     {
176         m_pThreadArgStore[thread].threadNum      = thread;
177         m_pThreadArgStore[thread].pTaskScheduler = this;
178         ThreadCreate( &m_pThreadIDs[thread], TaskingThreadFunction, &m_pThreadArgStore[thread] );
179         ++m_NumThreadsRunning;
180     }
181 
182     // ensure we have sufficient tasks to equally fill either all threads including main
183     // or just the threads we've launched, this is outside the firstinit as we want to be able
184     // to runtime change it
185     if( 1 == m_NumThreads )
186     {
187         m_NumPartitions = 1;
188         m_NumInitialPartitions = 1;
189     }
190     else
191     {
192         m_NumPartitions = m_NumThreads * (m_NumThreads - 1);
193         m_NumInitialPartitions = m_NumThreads - 1;
194         if( m_NumInitialPartitions > MAX_NUM_INITIAL_PARTITIONS )
195         {
196             m_NumInitialPartitions = MAX_NUM_INITIAL_PARTITIONS;
197         }
198     }
199 
200     m_bHaveThreads = true;
201 }
202 
StopThreads(bool bWait_)203 void TaskScheduler::StopThreads( bool bWait_ )
204 {
205     if( m_bHaveThreads )
206     {
207         // wait for them threads quit before deleting data
208         m_bRunning = false;
209         while( bWait_ && m_NumThreadsRunning > 1 )
210         {
211             // keep firing event to ensure all threads pick up state of m_bRunning
212             SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsRunning );
213         }
214 
215         for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
216         {
217             ThreadTerminate( m_pThreadIDs[thread] );
218         }
219 
220         m_NumThreads = 0;
221         delete[] m_pThreadArgStore;
222         delete[] m_pThreadIDs;
223         m_pThreadArgStore = 0;
224         m_pThreadIDs = 0;
225         SemaphoreClose( m_NewTaskSemaphore );
226 
227         m_bHaveThreads = false;
228         m_NumThreadsWaiting = 0;
229         m_NumThreadsRunning = 0;
230     }
231 }
232 
TryRunTask(uint32_t threadNum,uint32_t & hintPipeToCheck_io_)233 bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ )
234 {
235     // Run any tasks for this thread
236     RunPinnedTasks( threadNum );
237 
238     // check for tasks
239     SubTaskSet subTask;
240     bool bHaveTask = m_pPipesPerThread[ threadNum ].WriterTryReadFront( &subTask );
241 
242     uint32_t threadToCheck = hintPipeToCheck_io_;
243     uint32_t checkCount = 0;
244     while( !bHaveTask && checkCount < m_NumThreads )
245     {
246         threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads;
247         if( threadToCheck != threadNum )
248         {
249             bHaveTask = m_pPipesPerThread[ threadToCheck ].ReaderTryReadBack( &subTask );
250         }
251         ++checkCount;
252     }
253 
254     if( bHaveTask )
255     {
256         // update hint, will preserve value unless actually got task from another thread.
257         hintPipeToCheck_io_ = threadToCheck;
258 
259         uint32_t partitionSize = subTask.partition.end - subTask.partition.start;
260         if( subTask.pTask->m_RangeToRun < partitionSize )
261         {
262             SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun );
263             SplitAndAddTask( threadNum, subTask, subTask.pTask->m_RangeToRun );
264             taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum );
265             AtomicAdd( &taskToRun.pTask->m_RunningCount, -1 );
266         }
267         else
268         {
269 
270             // the task has already been divided up by AddTaskSetToPipe, so just run it
271             subTask.pTask->ExecuteRange( subTask.partition, threadNum );
272             AtomicAdd( &subTask.pTask->m_RunningCount, -1 );
273         }
274     }
275 
276     return bHaveTask;
277 
278 }
279 
WaitForTasks(uint32_t threadNum)280 void TaskScheduler::WaitForTasks( uint32_t threadNum )
281 {
282     // We incrememt the number of threads waiting here in order
283     // to ensure that the check for tasks occurs after the increment
284     // to prevent a task being added after a check, then the thread waiting.
285     // This will occasionally result in threads being mistakenly awoken,
286     // but they will then go back to sleep.
287     AtomicAdd( &m_NumThreadsWaiting, 1 );
288 
289     bool bHaveTasks = false;
290     for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
291     {
292         if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
293         {
294             bHaveTasks = true;
295             break;
296         }
297     }
298     if( !bHaveTasks && !m_pPinnedTaskListPerThread[ threadNum ].IsListEmpty() )
299     {
300         bHaveTasks = true;
301     }
302     if( !bHaveTasks )
303     {
304         SafeCallback( m_ProfilerCallbacks.waitStart, threadNum );
305         SemaphoreWait( m_NewTaskSemaphore );
306         SafeCallback( m_ProfilerCallbacks.waitStop, threadNum );
307     }
308 
309     AtomicAdd( &m_NumThreadsWaiting, -1 );
310 }
311 
WakeThreads(int32_t maxToWake_)312 void TaskScheduler::WakeThreads(  int32_t maxToWake_ )
313 {
314     if( maxToWake_ > 0 && maxToWake_  < m_NumThreadsWaiting )
315     {
316         SemaphoreSignal( m_NewTaskSemaphore, maxToWake_ );
317     }
318     else
319     {
320         SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsWaiting );
321     }
322 }
323 
SplitAndAddTask(uint32_t threadNum_,SubTaskSet subTask_,uint32_t rangeToSplit_)324 void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ )
325 {
326     while( subTask_.partition.start != subTask_.partition.end )
327     {
328         SubTaskSet taskToAdd = SplitTask( subTask_, rangeToSplit_ );
329 
330         // add the partition to the pipe
331         AtomicAdd( &subTask_.pTask->m_RunningCount, 1 );
332         if( !m_pPipesPerThread[ threadNum_ ].WriterTryWriteFront( taskToAdd ) )
333         {
334 
335             // alter range to run the appropriate fraction
336             if( taskToAdd.pTask->m_RangeToRun < rangeToSplit_ )
337             {
338                 taskToAdd.partition.end = taskToAdd.partition.start + taskToAdd.pTask->m_RangeToRun;
339                 subTask_.partition.start = taskToAdd.partition.end;
340             }
341             taskToAdd.pTask->ExecuteRange( taskToAdd.partition, threadNum_ );
342             AtomicAdd( &subTask_.pTask->m_RunningCount, -1 );
343         }
344         else
345         {
346             WakeThreads( 1 );
347         }
348     }
349 
350 }
351 
AddTaskSetToPipe(ITaskSet * pTaskSet)352 void    TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet )
353 {
354     pTaskSet->m_RunningCount = 0;
355 
356     // divide task up and add to pipe
357     pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / m_NumPartitions;
358     if( pTaskSet->m_RangeToRun < pTaskSet->m_MinRange ) { pTaskSet->m_RangeToRun = pTaskSet->m_MinRange; }
359 
360     uint32_t rangeToSplit = pTaskSet->m_SetSize / m_NumInitialPartitions;
361     if( rangeToSplit < pTaskSet->m_MinRange ) { rangeToSplit = pTaskSet->m_MinRange; }
362 
363     SubTaskSet subTask;
364     subTask.pTask = pTaskSet;
365     subTask.partition.start = 0;
366     subTask.partition.end = pTaskSet->m_SetSize;
367     SplitAndAddTask( gtl_threadNum, subTask, rangeToSplit );
368 }
369 
AddPinnedTask(IPinnedTask * pTask_)370 void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ )
371 {
372     pTask_->m_RunningCount = 1;
373     m_pPinnedTaskListPerThread[ pTask_->threadNum ].WriterWriteFront( pTask_ );
374     WakeThreads();
375 }
376 
RunPinnedTasks()377 void TaskScheduler::RunPinnedTasks()
378 {
379     uint32_t threadNum = gtl_threadNum;
380     RunPinnedTasks( threadNum );
381 }
382 
RunPinnedTasks(uint32_t threadNum)383 void TaskScheduler::RunPinnedTasks( uint32_t threadNum )
384 {
385     IPinnedTask* pPinnedTaskSet = NULL;
386     do
387     {
388         pPinnedTaskSet = m_pPinnedTaskListPerThread[ threadNum ].ReaderReadBack();
389         if( pPinnedTaskSet )
390         {
391             pPinnedTaskSet->Execute();
392             pPinnedTaskSet->m_RunningCount = 0;
393         }
394     } while( pPinnedTaskSet );
395 }
396 
WaitforTask(const ICompletable * pCompletable_)397 void    TaskScheduler::WaitforTask( const ICompletable* pCompletable_ )
398 {
399     uint32_t hintPipeToCheck_io = gtl_threadNum + 1;    // does not need to be clamped.
400     if( pCompletable_ )
401     {
402         while( pCompletable_->m_RunningCount )
403         {
404             TryRunTask( gtl_threadNum, hintPipeToCheck_io );
405             // should add a spin then wait for task completion event.
406         }
407     }
408     else
409     {
410             TryRunTask( gtl_threadNum, hintPipeToCheck_io );
411     }
412 }
413 
WaitforAll()414 void    TaskScheduler::WaitforAll()
415 {
416     bool bHaveTasks = true;
417      uint32_t hintPipeToCheck_io = gtl_threadNum  + 1;    // does not need to be clamped.
418     int32_t threadsRunning = m_NumThreadsRunning - 1;
419     while( bHaveTasks || m_NumThreadsWaiting < threadsRunning )
420     {
421         bHaveTasks = TryRunTask( gtl_threadNum, hintPipeToCheck_io );
422         if( !bHaveTasks )
423         {
424             for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
425             {
426                 if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
427                 {
428                     bHaveTasks = true;
429                     break;
430                 }
431             }
432         }
433      }
434 }
435 
WaitforAllAndShutdown()436 void    TaskScheduler::WaitforAllAndShutdown()
437 {
438     WaitforAll();
439     StopThreads(true);
440     delete[] m_pPipesPerThread;
441     m_pPipesPerThread = 0;
442 
443     delete[] m_pPinnedTaskListPerThread;
444     m_pPinnedTaskListPerThread = 0;
445 }
446 
GetNumTaskThreads() const447 uint32_t        TaskScheduler::GetNumTaskThreads() const
448 {
449     return m_NumThreads;
450 }
451 
TaskScheduler()452 TaskScheduler::TaskScheduler()
453         : m_pPipesPerThread(NULL)
454         , m_pPinnedTaskListPerThread(NULL)
455         , m_NumThreads(0)
456         , m_pThreadArgStore(NULL)
457         , m_pThreadIDs(NULL)
458         , m_bRunning(false)
459         , m_NumThreadsRunning(0)
460         , m_NumThreadsWaiting(0)
461         , m_NumPartitions(0)
462         , m_bHaveThreads(false)
463 {
464     memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));
465 }
466 
~TaskScheduler()467 TaskScheduler::~TaskScheduler()
468 {
469 #ifndef _WIN32
470   WaitforAllAndShutdown();
471 #endif
472 }
473 
Initialize(uint32_t numThreads_)474 void    TaskScheduler::Initialize( uint32_t numThreads_ )
475 {
476     assert( numThreads_ );
477     StopThreads( true ); // Stops threads, waiting for them.
478     delete[] m_pPipesPerThread;
479     delete[] m_pPinnedTaskListPerThread;
480 
481     m_NumThreads = numThreads_;
482 
483     m_pPipesPerThread          = new TaskPipe[ m_NumThreads ];
484     m_pPinnedTaskListPerThread = new PinnedTaskList[ m_NumThreads ];
485 
486     StartThreads();
487 }
488 
Initialize()489 void   TaskScheduler::Initialize()
490 {
491     Initialize( GetNumHardwareThreads() );
492 }
493