1 // Copyright (c) 2013 Doug Binks
2 //
3 // This software is provided 'as-is', without any express or implied
4 // warranty. In no event will the authors be held liable for any damages
5 // arising from the use of this software.
6 //
7 // Permission is granted to anyone to use this software for any purpose,
8 // including commercial applications, and to alter it and redistribute it
9 // freely, subject to the following restrictions:
10 //
11 // 1. The origin of this software must not be misrepresented; you must not
12 //    claim that you wrote the original software. If you use this software
13 //    in a product, an acknowledgement in the product documentation would be
14 //    appreciated but is not required.
15 // 2. Altered source versions must be plainly marked as such, and must not be
16 //    misrepresented as being the original software.
17 // 3. This notice may not be removed or altered from any source distribution.
18 
19 #pragma once
20 
21 #include <stdint.h>
22 #include "Threads.h"
23 #include "Atomics.h"
24 
25 #if   defined(_WIN32) && defined(ENKITS_BUILD_DLL)
26     // Building enkiTS as a DLL
27     #define ENKITS_API __declspec(dllexport)
28 #elif defined(_WIN32) && defined(ENKITS_DLL)
29     // Using enkiTS as a DLL
30     #define ENKITS_API __declspec(dllimport)
31 #elif defined(__GNUC__) && defined(ENKITS_BUILD_DLL)
32     // Building enkiTS as a shared library
33     #define ENKITS_API __attribute__((visibility("default")))
34 #else
35     #define ENKITS_API
36 #endif
37 
38 namespace enki
39 {
40 
41     struct TaskSetPartition
42     {
43         uint32_t start;
44         uint32_t end;
45     };
46 
47     class  TaskScheduler;
48     class  TaskPipe;
49     class  PinnedTaskList;
50     struct ThreadArgs;
51     struct SubTaskSet;
52 
53     // ICompletable is a base class used to check for completion.
54     // Do not use this class directly, instead derive from ITaskSet or IPinnedTask.
55     class ICompletable
56     {
57     public:
ICompletable()58         ICompletable() :        m_RunningCount(0) {}
59         virtual ~ICompletable() = default;
GetIsComplete()60         bool                    GetIsComplete() {
61             bool bRet = ( 0 == m_RunningCount );
62             BASE_MEMORYBARRIER_ACQUIRE();
63             return bRet; }
64     private:
65         friend class            TaskScheduler;
66         volatile int32_t        m_RunningCount;
67     };
68 
69     // Subclass ITaskSet to create tasks.
70     // TaskSets can be re-used, but check completion first.
71     class ITaskSet : public ICompletable
72     {
73     public:
ITaskSet()74         ITaskSet()
75             : m_SetSize(1)
76             , m_MinRange(1)
77             , m_RangeToRun(1)
78         {}
79 
ITaskSet(uint32_t setSize_)80         ITaskSet( uint32_t setSize_ )
81             : m_SetSize( setSize_ )
82             , m_MinRange(1)
83             , m_RangeToRun(1)
84         {}
85 
ITaskSet(uint32_t setSize_,uint32_t minRange_)86         ITaskSet( uint32_t setSize_, uint32_t minRange_ )
87             : m_SetSize( setSize_ )
88             , m_MinRange( minRange_ )
89             , m_RangeToRun(minRange_)
90         {}
91 
92         virtual ~ITaskSet() override = default;
93 
94         // Execute range should be overloaded to process tasks. It will be called with a
95         // range_ where range.start >= 0; range.start < range.end; and range.end < m_SetSize;
96         // The range values should be mapped so that linearly processing them in order is cache friendly
97         // i.e. neighbouring values should be close together.
98         // threadnum should not be used for changing processing of data, it's intended purpose
99         // is to allow per-thread data buckets for output.
100         virtual void            ExecuteRange( TaskSetPartition range, uint32_t threadnum  ) = 0;
101 
102         // Size of set - usually the number of data items to be processed, see ExecuteRange. Defaults to 1
103         uint32_t                m_SetSize;
104 
105         // Minimum size of of TaskSetPartition range when splitting a task set into partitions.
106         // This should be set to a value which results in computation effort of at least 10k
107         // clock cycles to minimize tast scheduler overhead.
108         // NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple
109         // of m_MinRange.
110         // Also known as grain size in literature.
111         uint32_t                m_MinRange;
112 
113     private:
114         friend class            TaskScheduler;
115         uint32_t                m_RangeToRun;
116     };
117 
118     // Subclass IPinnedTask to create tasks which cab be run on a given thread only.
119     class IPinnedTask : public ICompletable
120     {
121     public:
IPinnedTask()122         IPinnedTask()                      : threadNum(0), pNext(NULL) {}  // default is to run a task on main thread
IPinnedTask(uint32_t threadNum_)123         IPinnedTask( uint32_t threadNum_ ) : threadNum(threadNum_), pNext(NULL) {}  // default is to run a task on main thread
124 
125         virtual ~IPinnedTask() override = default;
126 
127 
128         // IPinnedTask needs to be non abstract for intrusive list functionality.
129         // Should never be called as should be overridden.
Execute()130         virtual void            Execute() { assert(false); }
131 
132 
133         uint32_t                 threadNum; // thread to run this pinned task on
134         IPinnedTask* volatile pNext;        // Do not use. For intrusive list only.
135     };
136 
137     // TaskScheduler implements several callbacks intended for profilers
138     typedef void (*ProfilerCallbackFunc)( uint32_t threadnum_ );
139     struct ProfilerCallbacks
140     {
141         ProfilerCallbackFunc threadStart;
142         ProfilerCallbackFunc threadStop;
143         ProfilerCallbackFunc waitStart;
144         ProfilerCallbackFunc waitStop;
145     };
146 
147     class TaskScheduler
148     {
149     public:
150         ENKITS_API TaskScheduler();
151         ENKITS_API ~TaskScheduler();
152 
153         // Call either Initialize() or Initialize( numThreads_ ) before adding tasks.
154 
155         // Initialize() will create GetNumHardwareThreads()-1 threads, which is
156         // sufficient to fill the system when including the main thread.
157         // Initialize can be called multiple times - it will wait for completion
158         // before re-initializing.
159         ENKITS_API void            Initialize();
160 
161         // Initialize( numThreads_ ) - numThreads_ (must be > 0)
162         // will create numThreads_-1 threads, as thread 0 is
163         // the thread on which the initialize was called.
164         ENKITS_API void            Initialize( uint32_t numThreads_ );
165 
166 
167         // Adds the TaskSet to pipe and returns if the pipe is not full.
168         // If the pipe is full, pTaskSet is run.
169         // should only be called from main thread, or within a task
170         ENKITS_API void            AddTaskSetToPipe( ITaskSet* pTaskSet );
171 
172         // Thread 0 is main thread, otherwise use threadNum
173         ENKITS_API void            AddPinnedTask( IPinnedTask* pTask_ );
174 
175         // This function will run any IPinnedTask* for current thread, but not run other
176         // Main thread should call this or use a wait to ensure it's tasks are run.
177         ENKITS_API void            RunPinnedTasks();
178 
179         // Runs the TaskSets in pipe until true == pTaskSet->GetIsComplete();
180         // should only be called from thread which created the taskscheduler , or within a task
181         // if called with 0 it will try to run tasks, and return if none available.
182         ENKITS_API void            WaitforTask( const ICompletable* pCompletable_ );
183 
184         // WaitforTaskSet, deprecated interface use WaitforTask
WaitforTaskSet(const ICompletable * pCompletable_)185         inline void     WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); }
186 
187         // Waits for all task sets to complete - not guaranteed to work unless we know we
188         // are in a situation where tasks aren't being continuosly added.
189         ENKITS_API void            WaitforAll();
190 
191         // Waits for all task sets to complete and shutdown threads - not guaranteed to work unless we know we
192         // are in a situation where tasks aren't being continuosly added.
193         ENKITS_API void            WaitforAllAndShutdown();
194 
195         // Returns the number of threads created for running tasks + 1
196         // to account for the main thread.
197         ENKITS_API uint32_t        GetNumTaskThreads() const;
198 
199         // Returns the ProfilerCallbacks structure so that it can be modified to
200         // set the callbacks.
201         ENKITS_API ProfilerCallbacks* GetProfilerCallbacks();
202 
203     private:
204         static THREADFUNC_DECL  TaskingThreadFunction( void* pArgs );
205         void             WaitForTasks( uint32_t threadNum );
206         void             RunPinnedTasks( uint32_t threadNum );
207         bool             TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ );
208         void             StartThreads();
209         void             StopThreads( bool bWait_ );
210         void             SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ );
211         void             WakeThreads( int32_t maxToWake_ = 0 );
212 
213         TaskPipe*                                                m_pPipesPerThread;
214         PinnedTaskList*                                          m_pPinnedTaskListPerThread;
215 
216         uint32_t                                                 m_NumThreads;
217         ThreadArgs*                                              m_pThreadArgStore;
218         threadid_t*                                              m_pThreadIDs;
219         volatile bool                                            m_bRunning;
220         volatile int32_t                                         m_NumThreadsRunning;
221         volatile int32_t                                         m_NumThreadsWaiting;
222         uint32_t                                                 m_NumPartitions;
223         uint32_t                                                 m_NumInitialPartitions;
224         semaphoreid_t                                            m_NewTaskSemaphore;
225         bool                                                     m_bHaveThreads;
226         ProfilerCallbacks                                         m_ProfilerCallbacks;
227 
228         TaskScheduler( const TaskScheduler& nocopy );
229         TaskScheduler& operator=( const TaskScheduler& nocopy );
230     };
231 
232 }
233