1 // Copyright (c) 2013 Doug Binks
2 //
3 // This software is provided 'as-is', without any express or implied
4 // warranty. In no event will the authors be held liable for any damages
5 // arising from the use of this software.
6 //
7 // Permission is granted to anyone to use this software for any purpose,
8 // including commercial applications, and to alter it and redistribute it
9 // freely, subject to the following restrictions:
10 //
11 // 1. The origin of this software must not be misrepresented; you must not
12 // claim that you wrote the original software. If you use this software
13 // in a product, an acknowledgement in the product documentation would be
14 // appreciated but is not required.
15 // 2. Altered source versions must be plainly marked as such, and must not be
16 // misrepresented as being the original software.
17 // 3. This notice may not be removed or altered from any source distribution.
18
19 #include <assert.h>
20
21 #include "TaskScheduler.h"
22 #include "LockLessMultiReadPipe.h"
23
24 #if defined __i386__ || defined __x86_64__
25 #include "x86intrin.h"
26 #elif defined _WIN32
27 #include <intrin.h>
28 #endif
29
30 using namespace enki;
31
32
33 static const uint32_t PIPESIZE_LOG2 = 8;
34 static const uint32_t SPIN_COUNT = 100;
35 static const uint32_t SPIN_BACKOFF_MULTIPLIER = 10;
36 static const uint32_t MAX_NUM_INITIAL_PARTITIONS = 8;
37
38 // each software thread gets it's own copy of gtl_threadNum, so this is safe to use as a static variable
39 static THREAD_LOCAL uint32_t gtl_threadNum = 0;
40
41 namespace enki
42 {
43 struct SubTaskSet
44 {
45 ITaskSet* pTask;
46 TaskSetPartition partition;
47 };
48
49 // we derive class TaskPipe rather than typedef to get forward declaration working easily
50 class TaskPipe : public LockLessMultiReadPipe<PIPESIZE_LOG2,enki::SubTaskSet> {};
51
52 struct ThreadArgs
53 {
54 uint32_t threadNum;
55 TaskScheduler* pTaskScheduler;
56 };
57
58 class PinnedTaskList : public LocklessMultiWriteIntrusiveList<IPinnedTask> {};
59 }
60
61 namespace
62 {
SplitTask(SubTaskSet & subTask_,uint32_t rangeToSplit_)63 SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ )
64 {
65 SubTaskSet splitTask = subTask_;
66 uint32_t rangeLeft = subTask_.partition.end - subTask_.partition.start;
67
68 if( rangeToSplit_ > rangeLeft )
69 {
70 rangeToSplit_ = rangeLeft;
71 }
72 splitTask.partition.end = subTask_.partition.start + rangeToSplit_;
73 subTask_.partition.start = splitTask.partition.end;
74 return splitTask;
75 }
76
77 #if ( defined _WIN32 && ( defined _M_IX86 || defined _M_X64 ) ) || ( defined __i386__ || defined __x86_64__ )
SpinWait(uint32_t spinCount_)78 static void SpinWait( uint32_t spinCount_ )
79 {
80 uint64_t end = __rdtsc() + spinCount_;
81 while( __rdtsc() < end )
82 {
83 _mm_pause();
84 }
85 }
86 #else
SpinWait(uint32_t spinCount_)87 static void SpinWait( uint32_t spinCount_ )
88 {
89 while( spinCount_ )
90 {
91 // TODO: may have NOP or yield equiv
92 --spinCount_;
93 }
94 }
95 #endif
96
97
98 }
99
100
SafeCallback(ProfilerCallbackFunc func_,uint32_t threadnum_)101 static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_)
102 {
103 if( func_ )
104 {
105 func_(threadnum_);
106 }
107 }
108
GetProfilerCallbacks()109 ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks()
110 {
111 return &m_ProfilerCallbacks;
112 }
113
TaskingThreadFunction(void * pArgs)114 THREADFUNC_DECL TaskScheduler::TaskingThreadFunction( void* pArgs )
115 {
116 ThreadArgs args = *(ThreadArgs*)pArgs;
117 uint32_t threadNum = args.threadNum;
118 TaskScheduler* pTS = args.pTaskScheduler;
119 gtl_threadNum = threadNum;
120
121 SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum );
122
123 uint32_t spinCount = SPIN_COUNT + 1;
124 uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped.
125 while( pTS->m_bRunning )
126 {
127 if(!pTS->TryRunTask( threadNum, hintPipeToCheck_io ) )
128 {
129 // no tasks, will spin then wait
130 ++spinCount;
131 if( spinCount > SPIN_COUNT )
132 {
133 pTS->WaitForTasks( threadNum );
134 spinCount = 0;
135 }
136 else
137 {
138 // Note: see https://software.intel.com/en-us/articles/a-common-construct-to-avoid-the-contention-of-threads-architecture-agnostic-spin-wait-loops
139 uint32_t spinBackoffCount = spinCount * SPIN_BACKOFF_MULTIPLIER;
140 SpinWait( spinBackoffCount );
141 }
142 }
143 else
144 {
145 spinCount = 0;
146 }
147 }
148
149 AtomicAdd( &pTS->m_NumThreadsRunning, -1 );
150 SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum );
151
152 return 0;
153 }
154
155
StartThreads()156 void TaskScheduler::StartThreads()
157 {
158 if( m_bHaveThreads )
159 {
160 return;
161 }
162 m_bRunning = true;
163
164 SemaphoreCreate( m_NewTaskSemaphore );
165
166 // we create one less thread than m_NumThreads as the main thread counts as one
167 m_pThreadArgStore = new ThreadArgs[m_NumThreads];
168 m_pThreadIDs = new threadid_t[m_NumThreads];
169 m_pThreadArgStore[0].threadNum = 0;
170 m_pThreadArgStore[0].pTaskScheduler = this;
171 m_pThreadIDs[0] = 0;
172 m_NumThreadsWaiting = 0;
173 m_NumThreadsRunning = 1;// acount for main thread
174 for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
175 {
176 m_pThreadArgStore[thread].threadNum = thread;
177 m_pThreadArgStore[thread].pTaskScheduler = this;
178 ThreadCreate( &m_pThreadIDs[thread], TaskingThreadFunction, &m_pThreadArgStore[thread] );
179 ++m_NumThreadsRunning;
180 }
181
182 // ensure we have sufficient tasks to equally fill either all threads including main
183 // or just the threads we've launched, this is outside the firstinit as we want to be able
184 // to runtime change it
185 if( 1 == m_NumThreads )
186 {
187 m_NumPartitions = 1;
188 m_NumInitialPartitions = 1;
189 }
190 else
191 {
192 m_NumPartitions = m_NumThreads * (m_NumThreads - 1);
193 m_NumInitialPartitions = m_NumThreads - 1;
194 if( m_NumInitialPartitions > MAX_NUM_INITIAL_PARTITIONS )
195 {
196 m_NumInitialPartitions = MAX_NUM_INITIAL_PARTITIONS;
197 }
198 }
199
200 m_bHaveThreads = true;
201 }
202
StopThreads(bool bWait_)203 void TaskScheduler::StopThreads( bool bWait_ )
204 {
205 if( m_bHaveThreads )
206 {
207 // wait for them threads quit before deleting data
208 m_bRunning = false;
209 while( bWait_ && m_NumThreadsRunning > 1 )
210 {
211 // keep firing event to ensure all threads pick up state of m_bRunning
212 SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsRunning );
213 }
214
215 for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
216 {
217 ThreadTerminate( m_pThreadIDs[thread] );
218 }
219
220 m_NumThreads = 0;
221 delete[] m_pThreadArgStore;
222 delete[] m_pThreadIDs;
223 m_pThreadArgStore = 0;
224 m_pThreadIDs = 0;
225 SemaphoreClose( m_NewTaskSemaphore );
226
227 m_bHaveThreads = false;
228 m_NumThreadsWaiting = 0;
229 m_NumThreadsRunning = 0;
230 }
231 }
232
TryRunTask(uint32_t threadNum,uint32_t & hintPipeToCheck_io_)233 bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ )
234 {
235 // Run any tasks for this thread
236 RunPinnedTasks( threadNum );
237
238 // check for tasks
239 SubTaskSet subTask;
240 bool bHaveTask = m_pPipesPerThread[ threadNum ].WriterTryReadFront( &subTask );
241
242 uint32_t threadToCheck = hintPipeToCheck_io_;
243 uint32_t checkCount = 0;
244 while( !bHaveTask && checkCount < m_NumThreads )
245 {
246 threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads;
247 if( threadToCheck != threadNum )
248 {
249 bHaveTask = m_pPipesPerThread[ threadToCheck ].ReaderTryReadBack( &subTask );
250 }
251 ++checkCount;
252 }
253
254 if( bHaveTask )
255 {
256 // update hint, will preserve value unless actually got task from another thread.
257 hintPipeToCheck_io_ = threadToCheck;
258
259 uint32_t partitionSize = subTask.partition.end - subTask.partition.start;
260 if( subTask.pTask->m_RangeToRun < partitionSize )
261 {
262 SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun );
263 SplitAndAddTask( threadNum, subTask, subTask.pTask->m_RangeToRun );
264 taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum );
265 AtomicAdd( &taskToRun.pTask->m_RunningCount, -1 );
266 }
267 else
268 {
269
270 // the task has already been divided up by AddTaskSetToPipe, so just run it
271 subTask.pTask->ExecuteRange( subTask.partition, threadNum );
272 AtomicAdd( &subTask.pTask->m_RunningCount, -1 );
273 }
274 }
275
276 return bHaveTask;
277
278 }
279
WaitForTasks(uint32_t threadNum)280 void TaskScheduler::WaitForTasks( uint32_t threadNum )
281 {
282 // We incrememt the number of threads waiting here in order
283 // to ensure that the check for tasks occurs after the increment
284 // to prevent a task being added after a check, then the thread waiting.
285 // This will occasionally result in threads being mistakenly awoken,
286 // but they will then go back to sleep.
287 AtomicAdd( &m_NumThreadsWaiting, 1 );
288
289 bool bHaveTasks = false;
290 for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
291 {
292 if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
293 {
294 bHaveTasks = true;
295 break;
296 }
297 }
298 if( !bHaveTasks && !m_pPinnedTaskListPerThread[ threadNum ].IsListEmpty() )
299 {
300 bHaveTasks = true;
301 }
302 if( !bHaveTasks )
303 {
304 SafeCallback( m_ProfilerCallbacks.waitStart, threadNum );
305 SemaphoreWait( m_NewTaskSemaphore );
306 SafeCallback( m_ProfilerCallbacks.waitStop, threadNum );
307 }
308
309 AtomicAdd( &m_NumThreadsWaiting, -1 );
310 }
311
WakeThreads(int32_t maxToWake_)312 void TaskScheduler::WakeThreads( int32_t maxToWake_ )
313 {
314 if( maxToWake_ > 0 && maxToWake_ < m_NumThreadsWaiting )
315 {
316 SemaphoreSignal( m_NewTaskSemaphore, maxToWake_ );
317 }
318 else
319 {
320 SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsWaiting );
321 }
322 }
323
SplitAndAddTask(uint32_t threadNum_,SubTaskSet subTask_,uint32_t rangeToSplit_)324 void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ )
325 {
326 while( subTask_.partition.start != subTask_.partition.end )
327 {
328 SubTaskSet taskToAdd = SplitTask( subTask_, rangeToSplit_ );
329
330 // add the partition to the pipe
331 AtomicAdd( &subTask_.pTask->m_RunningCount, 1 );
332 if( !m_pPipesPerThread[ threadNum_ ].WriterTryWriteFront( taskToAdd ) )
333 {
334
335 // alter range to run the appropriate fraction
336 if( taskToAdd.pTask->m_RangeToRun < rangeToSplit_ )
337 {
338 taskToAdd.partition.end = taskToAdd.partition.start + taskToAdd.pTask->m_RangeToRun;
339 subTask_.partition.start = taskToAdd.partition.end;
340 }
341 taskToAdd.pTask->ExecuteRange( taskToAdd.partition, threadNum_ );
342 AtomicAdd( &subTask_.pTask->m_RunningCount, -1 );
343 }
344 else
345 {
346 WakeThreads( 1 );
347 }
348 }
349
350 }
351
AddTaskSetToPipe(ITaskSet * pTaskSet)352 void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet )
353 {
354 pTaskSet->m_RunningCount = 0;
355
356 // divide task up and add to pipe
357 pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / m_NumPartitions;
358 if( pTaskSet->m_RangeToRun < pTaskSet->m_MinRange ) { pTaskSet->m_RangeToRun = pTaskSet->m_MinRange; }
359
360 uint32_t rangeToSplit = pTaskSet->m_SetSize / m_NumInitialPartitions;
361 if( rangeToSplit < pTaskSet->m_MinRange ) { rangeToSplit = pTaskSet->m_MinRange; }
362
363 SubTaskSet subTask;
364 subTask.pTask = pTaskSet;
365 subTask.partition.start = 0;
366 subTask.partition.end = pTaskSet->m_SetSize;
367 SplitAndAddTask( gtl_threadNum, subTask, rangeToSplit );
368 }
369
AddPinnedTask(IPinnedTask * pTask_)370 void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ )
371 {
372 pTask_->m_RunningCount = 1;
373 m_pPinnedTaskListPerThread[ pTask_->threadNum ].WriterWriteFront( pTask_ );
374 WakeThreads();
375 }
376
RunPinnedTasks()377 void TaskScheduler::RunPinnedTasks()
378 {
379 uint32_t threadNum = gtl_threadNum;
380 RunPinnedTasks( threadNum );
381 }
382
RunPinnedTasks(uint32_t threadNum)383 void TaskScheduler::RunPinnedTasks( uint32_t threadNum )
384 {
385 IPinnedTask* pPinnedTaskSet = NULL;
386 do
387 {
388 pPinnedTaskSet = m_pPinnedTaskListPerThread[ threadNum ].ReaderReadBack();
389 if( pPinnedTaskSet )
390 {
391 pPinnedTaskSet->Execute();
392 pPinnedTaskSet->m_RunningCount = 0;
393 }
394 } while( pPinnedTaskSet );
395 }
396
WaitforTask(const ICompletable * pCompletable_)397 void TaskScheduler::WaitforTask( const ICompletable* pCompletable_ )
398 {
399 uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped.
400 if( pCompletable_ )
401 {
402 while( pCompletable_->m_RunningCount )
403 {
404 TryRunTask( gtl_threadNum, hintPipeToCheck_io );
405 // should add a spin then wait for task completion event.
406 }
407 }
408 else
409 {
410 TryRunTask( gtl_threadNum, hintPipeToCheck_io );
411 }
412 }
413
WaitforAll()414 void TaskScheduler::WaitforAll()
415 {
416 bool bHaveTasks = true;
417 uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped.
418 int32_t threadsRunning = m_NumThreadsRunning - 1;
419 while( bHaveTasks || m_NumThreadsWaiting < threadsRunning )
420 {
421 bHaveTasks = TryRunTask( gtl_threadNum, hintPipeToCheck_io );
422 if( !bHaveTasks )
423 {
424 for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
425 {
426 if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
427 {
428 bHaveTasks = true;
429 break;
430 }
431 }
432 }
433 }
434 }
435
WaitforAllAndShutdown()436 void TaskScheduler::WaitforAllAndShutdown()
437 {
438 WaitforAll();
439 StopThreads(true);
440 delete[] m_pPipesPerThread;
441 m_pPipesPerThread = 0;
442
443 delete[] m_pPinnedTaskListPerThread;
444 m_pPinnedTaskListPerThread = 0;
445 }
446
GetNumTaskThreads() const447 uint32_t TaskScheduler::GetNumTaskThreads() const
448 {
449 return m_NumThreads;
450 }
451
TaskScheduler()452 TaskScheduler::TaskScheduler()
453 : m_pPipesPerThread(NULL)
454 , m_pPinnedTaskListPerThread(NULL)
455 , m_NumThreads(0)
456 , m_pThreadArgStore(NULL)
457 , m_pThreadIDs(NULL)
458 , m_bRunning(false)
459 , m_NumThreadsRunning(0)
460 , m_NumThreadsWaiting(0)
461 , m_NumPartitions(0)
462 , m_bHaveThreads(false)
463 {
464 memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));
465 }
466
~TaskScheduler()467 TaskScheduler::~TaskScheduler()
468 {
469 #ifndef _WIN32
470 WaitforAllAndShutdown();
471 #endif
472 }
473
Initialize(uint32_t numThreads_)474 void TaskScheduler::Initialize( uint32_t numThreads_ )
475 {
476 assert( numThreads_ );
477 StopThreads( true ); // Stops threads, waiting for them.
478 delete[] m_pPipesPerThread;
479 delete[] m_pPinnedTaskListPerThread;
480
481 m_NumThreads = numThreads_;
482
483 m_pPipesPerThread = new TaskPipe[ m_NumThreads ];
484 m_pPinnedTaskListPerThread = new PinnedTaskList[ m_NumThreads ];
485
486 StartThreads();
487 }
488
Initialize()489 void TaskScheduler::Initialize()
490 {
491 Initialize( GetNumHardwareThreads() );
492 }
493