1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "JobScheduler.h"
8 #include "Logging.h"
9 
10 namespace mozilla {
11 namespace gfx {
12 
13 JobScheduler* JobScheduler::sSingleton = nullptr;
14 
Init(uint32_t aNumThreads,uint32_t aNumQueues)15 bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues) {
16   MOZ_ASSERT(!sSingleton);
17   MOZ_ASSERT(aNumThreads >= aNumQueues);
18 
19   sSingleton = new JobScheduler();
20   sSingleton->mNextQueue = 0;
21 
22   for (uint32_t i = 0; i < aNumQueues; ++i) {
23     sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
24   }
25 
26   for (uint32_t i = 0; i < aNumThreads; ++i) {
27     sSingleton->mWorkerThreads.push_back(
28         WorkerThread::Create(sSingleton->mDrawingQueues[i % aNumQueues]));
29   }
30   return true;
31 }
32 
ShutDown()33 void JobScheduler::ShutDown() {
34   MOZ_ASSERT(IsEnabled());
35   if (!IsEnabled()) {
36     return;
37   }
38 
39   for (auto queue : sSingleton->mDrawingQueues) {
40     queue->ShutDown();
41     delete queue;
42   }
43 
44   for (WorkerThread* thread : sSingleton->mWorkerThreads) {
45     // this will block until the thread is joined.
46     delete thread;
47   }
48 
49   sSingleton->mWorkerThreads.clear();
50   delete sSingleton;
51   sSingleton = nullptr;
52 }
53 
ProcessJob(Job * aJob)54 JobStatus JobScheduler::ProcessJob(Job* aJob) {
55   MOZ_ASSERT(aJob);
56   auto status = aJob->Run();
57   if (status == JobStatus::Error || status == JobStatus::Complete) {
58     delete aJob;
59   }
60   return status;
61 }
62 
SubmitJob(Job * aJob)63 void JobScheduler::SubmitJob(Job* aJob) {
64   MOZ_ASSERT(aJob);
65   RefPtr<SyncObject> start = aJob->GetStartSync();
66   if (start && start->Register(aJob)) {
67     // The Job buffer starts with a non-signaled sync object, it
68     // is now registered in the list of task buffers waiting on the
69     // sync object, so we should not place it in the queue.
70     return;
71   }
72 
73   GetQueueForJob(aJob)->SubmitJob(aJob);
74 }
75 
Join(SyncObject * aCompletion)76 void JobScheduler::Join(SyncObject* aCompletion) {
77   RefPtr<EventObject> waitForCompletion = new EventObject();
78   JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
79   waitForCompletion->Wait();
80 }
81 
GetQueueForJob(Job * aJob)82 MultiThreadedJobQueue* JobScheduler::GetQueueForJob(Job* aJob) {
83   return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
84                                    : GetDrawingQueue();
85 }
86 
Job(SyncObject * aStart,SyncObject * aCompletion,WorkerThread * aThread)87 Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
88     : mNextWaitingJob(nullptr),
89       mStartSync(aStart),
90       mCompletionSync(aCompletion),
91       mPinToThread(aThread) {
92   if (mStartSync) {
93     mStartSync->AddSubsequent(this);
94   }
95   if (mCompletionSync) {
96     mCompletionSync->AddPrerequisite(this);
97   }
98 }
99 
~Job()100 Job::~Job() {
101   if (mCompletionSync) {
102     // printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
103     mCompletionSync->Signal();
104     mCompletionSync = nullptr;
105   }
106 }
107 
Run()108 JobStatus SetEventJob::Run() {
109   mEvent->Set();
110   return JobStatus::Complete;
111 }
112 
SetEventJob(EventObject * aEvent,SyncObject * aStart,SyncObject * aCompletion,WorkerThread * aWorker)113 SetEventJob::SetEventJob(EventObject* aEvent, SyncObject* aStart,
114                          SyncObject* aCompletion, WorkerThread* aWorker)
115     : Job(aStart, aCompletion, aWorker), mEvent(aEvent) {}
116 
~SetEventJob()117 SetEventJob::~SetEventJob() {}
118 
SyncObject(uint32_t aNumPrerequisites)119 SyncObject::SyncObject(uint32_t aNumPrerequisites)
120     : mSignals(aNumPrerequisites),
121       mFirstWaitingJob(nullptr)
122 #ifdef DEBUG
123       ,
124       mNumPrerequisites(aNumPrerequisites),
125       mAddedPrerequisites(0)
126 #endif
127 {
128 }
129 
~SyncObject()130 SyncObject::~SyncObject() { MOZ_ASSERT(mFirstWaitingJob == nullptr); }
131 
Register(Job * aJob)132 bool SyncObject::Register(Job* aJob) {
133   MOZ_ASSERT(aJob);
134 
135   // For now, ensure that when we schedule the first subsequent, we have already
136   // created all of the prerequisites. This is an arbitrary restriction because
137   // we specify the number of prerequisites in the constructor, but in the
138   // typical scenario, if the assertion FreezePrerequisite blows up here it
139   // probably means we got the initial nmber of prerequisites wrong. We can
140   // decide to remove this restriction if needed.
141   FreezePrerequisites();
142 
143   int32_t signals = mSignals;
144 
145   if (signals > 0) {
146     AddWaitingJob(aJob);
147     // Since Register and Signal can be called concurrently, it can happen that
148     // reading mSignals in Register happens before decrementing mSignals in
149     // Signal, but SubmitWaitingJobs happens before AddWaitingJob. This ordering
150     // means the SyncObject ends up in the signaled state with a task sitting in
151     // the waiting list. To prevent that we check mSignals a second time and
152     // submit again if signals reached zero in the mean time. We do this instead
153     // of holding a mutex around mSignals+mJobs to reduce lock contention.
154     int32_t signals2 = mSignals;
155     if (signals2 == 0) {
156       SubmitWaitingJobs();
157     }
158     return true;
159   }
160 
161   return false;
162 }
163 
Signal()164 void SyncObject::Signal() {
165   int32_t signals = --mSignals;
166   MOZ_ASSERT(signals >= 0);
167 
168   if (signals == 0) {
169     SubmitWaitingJobs();
170   }
171 }
172 
AddWaitingJob(Job * aJob)173 void SyncObject::AddWaitingJob(Job* aJob) {
174   // Push (using atomics) the task into the list of waiting tasks.
175   for (;;) {
176     Job* first = mFirstWaitingJob;
177     aJob->mNextWaitingJob = first;
178     if (mFirstWaitingJob.compareExchange(first, aJob)) {
179       break;
180     }
181   }
182 }
183 
SubmitWaitingJobs()184 void SyncObject::SubmitWaitingJobs() {
185   // Scheduling the tasks can cause code that modifies <this>'s reference
186   // count to run concurrently, and cause the caller of this function to
187   // be owned by another thread. We need to make sure the reference count
188   // does not reach 0 on another thread before the end of this method, so
189   // hold a strong ref to prevent that!
190   RefPtr<SyncObject> kungFuDeathGrip(this);
191 
192   // First atomically swap mFirstWaitingJob and waitingJobs...
193   Job* waitingJobs = nullptr;
194   for (;;) {
195     waitingJobs = mFirstWaitingJob;
196     if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
197       break;
198     }
199   }
200 
201   // ... and submit all of the waiting tasks in waitingJob now that they belong
202   // to this thread.
203   while (waitingJobs) {
204     Job* next = waitingJobs->mNextWaitingJob;
205     waitingJobs->mNextWaitingJob = nullptr;
206     JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
207     waitingJobs = next;
208   }
209 }
210 
IsSignaled()211 bool SyncObject::IsSignaled() { return mSignals == 0; }
212 
FreezePrerequisites()213 void SyncObject::FreezePrerequisites() {
214   MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
215 }
216 
AddPrerequisite(Job * aJob)217 void SyncObject::AddPrerequisite(Job* aJob) {
218   MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
219 }
220 
AddSubsequent(Job * aJob)221 void SyncObject::AddSubsequent(Job* aJob) {}
222 
WorkerThread(MultiThreadedJobQueue * aJobQueue)223 WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
224     : mQueue(aJobQueue) {
225   aJobQueue->RegisterThread();
226 }
227 
Run()228 void WorkerThread::Run() {
229   SetName("gfx worker");
230 
231   for (;;) {
232     Job* commands = nullptr;
233     if (!mQueue->WaitForJob(commands)) {
234       mQueue->UnregisterThread();
235       return;
236     }
237 
238     JobStatus status = JobScheduler::ProcessJob(commands);
239 
240     if (status == JobStatus::Error) {
241       // Don't try to handle errors for now, but that's open to discussions.
242       // I expect errors to be mostly OOM issues.
243       gfxDevCrash(LogReason::JobStatusError)
244           << "Invalid job status " << (int)status;
245     }
246   }
247 }
248 
249 }  // namespace gfx
250 }  // namespace mozilla
251