1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #ifndef MOZILLA_GFX_TASKSCHEDULER_H_
8 #define MOZILLA_GFX_TASKSCHEDULER_H_
9 
10 #include "mozilla/RefPtr.h"
11 #include "mozilla/gfx/Types.h"
12 #include "mozilla/RefCounted.h"
13 
14 #ifdef WIN32
15 #include "mozilla/gfx/JobScheduler_win32.h"
16 #else
17 #include "mozilla/gfx/JobScheduler_posix.h"
18 #endif
19 
20 #include <vector>
21 
22 namespace mozilla {
23 namespace gfx {
24 
25 class MultiThreadedJobQueue;
26 class SyncObject;
27 class WorkerThread;
28 
29 class JobScheduler {
30  public:
31   /// Return one of the queues that the drawing worker threads pull from, chosen
32   /// pseudo-randomly.
GetDrawingQueue()33   static MultiThreadedJobQueue* GetDrawingQueue() {
34     return sSingleton->mDrawingQueues[sSingleton->mNextQueue++ %
35                                       sSingleton->mDrawingQueues.size()];
36   }
37 
38   /// Return one of the queues that the drawing worker threads pull from with a
39   /// hash to choose the queue.
40   ///
41   /// Calling this function several times with the same hash will yield the same
42   /// queue.
GetDrawingQueue(uint32_t aHash)43   static MultiThreadedJobQueue* GetDrawingQueue(uint32_t aHash) {
44     return sSingleton
45         ->mDrawingQueues[aHash % sSingleton->mDrawingQueues.size()];
46   }
47 
48   /// Return the task queue associated to the worker the task is pinned to if
49   /// the task is pinned to a worker, or a random queue.
50   static MultiThreadedJobQueue* GetQueueForJob(Job* aJob);
51 
52   /// Initialize the task scheduler with aNumThreads worker threads for drawing
53   /// and aNumQueues task queues.
54   ///
55   /// The number of threads must be superior or equal to the number of queues
56   /// (since for now a worker thread only pulls from one queue).
57   static bool Init(uint32_t aNumThreads, uint32_t aNumQueues);
58 
59   /// Shut the scheduler down.
60   ///
61   /// This will block until worker threads are joined and deleted.
62   static void ShutDown();
63 
64   /// Returns true if there is a successfully initialized JobScheduler
65   /// singleton.
IsEnabled()66   static bool IsEnabled() { return !!sSingleton; }
67 
68   /// Submit a task buffer to its associated queue.
69   ///
70   /// The caller looses ownership of the task buffer.
71   static void SubmitJob(Job* aJobs);
72 
73   /// Convenience function to block the current thread until a given SyncObject
74   /// is in the signaled state.
75   ///
76   /// The current thread will first try to steal jobs before blocking.
77   static void Join(SyncObject* aCompletionSync);
78 
79   /// Process commands until the command buffer needs to block on a sync object,
80   /// completes, yields, or encounters an error.
81   ///
82   /// Can be used on any thread. Worker threads basically loop over this, but
83   /// the main thread can also dequeue pending task buffers and process them
84   /// alongside the worker threads if it is about to block until completion
85   /// anyway.
86   ///
87   /// The caller looses ownership of the task buffer.
88   static JobStatus ProcessJob(Job* aJobs);
89 
90  protected:
91   static JobScheduler* sSingleton;
92 
93   // queues of Job that are ready to be processed
94   std::vector<MultiThreadedJobQueue*> mDrawingQueues;
95   std::vector<WorkerThread*> mWorkerThreads;
96   Atomic<uint32_t> mNextQueue;
97 };
98 
99 /// Jobs are not reference-counted because they don't have shared ownership.
100 /// The ownership of tasks can change when they are passed to certain methods
101 /// of JobScheduler and SyncObject. See the docuumentaion of these classes.
102 class Job {
103  public:
104   Job(SyncObject* aStart, SyncObject* aCompletion,
105       WorkerThread* aThread = nullptr);
106 
107   virtual ~Job();
108 
109   virtual JobStatus Run() = 0;
110 
111   /// For use in JobScheduler::SubmitJob. Don't use it anywhere else.
112   // already_AddRefed<SyncObject> GetAndResetStartSync();
GetStartSync()113   SyncObject* GetStartSync() { return mStartSync; }
114 
IsPinnedToAThread()115   bool IsPinnedToAThread() const { return !!mPinToThread; }
116 
GetWorkerThread()117   WorkerThread* GetWorkerThread() { return mPinToThread; }
118 
119  protected:
120   // An intrusive linked list of tasks waiting for a sync object to enter the
121   // signaled state. When the task is not waiting for a sync object,
122   // mNextWaitingJob should be null. This is only accessed from the thread that
123   // owns the task.
124   Job* mNextWaitingJob;
125 
126   RefPtr<SyncObject> mStartSync;
127   RefPtr<SyncObject> mCompletionSync;
128   WorkerThread* mPinToThread;
129 
130   friend class SyncObject;
131 };
132 
133 class EventObject;
134 
135 /// This task will set an EventObject.
136 ///
137 /// Typically used as the final task, so that the main thread can block on the
138 /// corresponfing EventObject until all of the tasks are processed.
139 class SetEventJob : public Job {
140  public:
141   explicit SetEventJob(EventObject* aEvent, SyncObject* aStart,
142                        SyncObject* aCompletion = nullptr,
143                        WorkerThread* aPinToWorker = nullptr);
144 
145   ~SetEventJob();
146 
147   JobStatus Run() override;
148 
GetEvent()149   EventObject* GetEvent() { return mEvent; }
150 
151  protected:
152   RefPtr<EventObject> mEvent;
153 };
154 
155 /// A synchronization object that can be used to express dependencies and
156 /// ordering between tasks.
157 ///
158 /// Jobs can register to SyncObjects in order to asynchronously wait for a
159 /// signal. In practice, Job objects usually start with a sync object (startSyc)
160 /// and end with another one (completionSync). a Job never gets processed before
161 /// its startSync is in the signaled state, and signals its completionSync as
162 /// soon as it finishes. This is how dependencies between tasks is expressed.
163 class SyncObject final : public external::AtomicRefCounted<SyncObject> {
164  public:
165   MOZ_DECLARE_REFCOUNTED_TYPENAME(SyncObject)
166 
167   /// Create a synchronization object.
168   ///
169   /// aNumPrerequisites represents the number of times the object must be
170   /// signaled before actually entering the signaled state (in other words, it
171   /// means the number of dependencies of this sync object).
172   ///
173   /// Explicitly specifying the number of prerequisites when creating sync
174   /// objects makes it easy to start scheduling some of the prerequisite tasks
175   /// while creating the others, which is how we typically use the task
176   /// scheduler. Automatically determining the number of prerequisites using
177   /// Job's constructor brings the risk that the sync object enters the signaled
178   /// state while we are still adding prerequisites which is hard to fix without
179   /// using muteces.
180   explicit SyncObject(uint32_t aNumPrerequisites = 1);
181 
182   ~SyncObject();
183 
184   /// Attempt to register a task.
185   ///
186   /// If the sync object is already in the signaled state, the buffer is *not*
187   /// registered and the sync object does not take ownership of the task.
188   /// If the object is not yet in the signaled state, it takes ownership of
189   /// the task and places it in a list of pending tasks.
190   /// Pending tasks will not be processed by the worker thread.
191   /// When the SyncObject reaches the signaled state, it places the pending
192   /// tasks back in the available buffer queue, so that they can be
193   /// scheduled again.
194   ///
195   /// Returns true if the SyncOject is not already in the signaled state.
196   /// This means that if this method returns true, the SyncObject has taken
197   /// ownership of the Job.
198   bool Register(Job* aJob);
199 
200   /// Signal the SyncObject.
201   ///
202   /// This decrements an internal counter. The sync object reaches the signaled
203   /// state when the counter gets to zero.
204   void Signal();
205 
206   /// Returns true if mSignals is equal to zero. In other words, returns true
207   /// if all prerequisite tasks have already signaled the sync object.
208   bool IsSignaled();
209 
210   /// Asserts that the number of added prerequisites is equal to the number
211   /// specified in the constructor (does nothin in release builds).
212   void FreezePrerequisites();
213 
214  private:
215   // Called by Job's constructor
216   void AddSubsequent(Job* aJob);
217   void AddPrerequisite(Job* aJob);
218 
219   void AddWaitingJob(Job* aJob);
220 
221   void SubmitWaitingJobs();
222 
223   Atomic<int32_t> mSignals;
224   Atomic<Job*> mFirstWaitingJob;
225 
226 #ifdef DEBUG
227   uint32_t mNumPrerequisites;
228   Atomic<uint32_t> mAddedPrerequisites;
229 #endif
230 
231   friend class Job;
232   friend class JobScheduler;
233 };
234 
235 /// Base class for worker threads.
236 class WorkerThread {
237  public:
238   static WorkerThread* Create(MultiThreadedJobQueue* aJobQueue);
239 
~WorkerThread()240   virtual ~WorkerThread() {}
241 
242   void Run();
243 
GetJobQueue()244   MultiThreadedJobQueue* GetJobQueue() { return mQueue; }
245 
246  protected:
247   explicit WorkerThread(MultiThreadedJobQueue* aJobQueue);
248 
SetName(const char * aName)249   virtual void SetName(const char* aName) {}
250 
251   MultiThreadedJobQueue* mQueue;
252 };
253 
254 }  // namespace gfx
255 }  // namespace mozilla
256 
257 #endif
258