1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "mozilla/TaskQueue.h"
8 
9 #include "mozilla/DelayedRunnable.h"
10 #include "mozilla/ProfilerRunnable.h"
11 #include "nsThreadUtils.h"
12 
13 namespace mozilla {
14 
TaskQueue(already_AddRefed<nsIEventTarget> aTarget,const char * aName,bool aRequireTailDispatch)15 TaskQueue::TaskQueue(already_AddRefed<nsIEventTarget> aTarget,
16                      const char* aName, bool aRequireTailDispatch)
17     : AbstractThread(aRequireTailDispatch),
18       mTarget(aTarget),
19       mQueueMonitor("TaskQueue::Queue"),
20       mTailDispatcher(nullptr),
21       mIsRunning(false),
22       mIsShutdown(false),
23       mName(aName) {}
24 
~TaskQueue()25 TaskQueue::~TaskQueue() {
26   // No one is referencing this TaskQueue anymore, meaning no tasks can be
27   // pending as all Runner hold a reference to this TaskQueue.
28   MOZ_ASSERT(mScheduledDelayedRunnables.IsEmpty());
29 }
30 
31 NS_IMPL_ISUPPORTS_INHERITED(TaskQueue, AbstractThread, nsIDirectTaskDispatcher,
32                             nsIDelayedRunnableObserver);
33 
TailDispatcher()34 TaskDispatcher& TaskQueue::TailDispatcher() {
35   MOZ_ASSERT(IsCurrentThreadIn());
36   MOZ_ASSERT(mTailDispatcher);
37   return *mTailDispatcher;
38 }
39 
40 // Note aRunnable is passed by ref to support conditional ownership transfer.
41 // See Dispatch() in TaskQueue.h for more details.
DispatchLocked(nsCOMPtr<nsIRunnable> & aRunnable,uint32_t aFlags,DispatchReason aReason)42 nsresult TaskQueue::DispatchLocked(nsCOMPtr<nsIRunnable>& aRunnable,
43                                    uint32_t aFlags, DispatchReason aReason) {
44   mQueueMonitor.AssertCurrentThreadOwns();
45   if (mIsShutdown) {
46     return NS_ERROR_FAILURE;
47   }
48 
49   AbstractThread* currentThread;
50   if (aReason != TailDispatch && (currentThread = GetCurrent()) &&
51       RequiresTailDispatch(currentThread) &&
52       currentThread->IsTailDispatcherAvailable()) {
53     MOZ_ASSERT(aFlags == NS_DISPATCH_NORMAL,
54                "Tail dispatch doesn't support flags");
55     return currentThread->TailDispatcher().AddTask(this, aRunnable.forget());
56   }
57 
58   LogRunnable::LogDispatch(aRunnable);
59   mTasks.Push({std::move(aRunnable), aFlags});
60 
61   if (mIsRunning) {
62     return NS_OK;
63   }
64   RefPtr<nsIRunnable> runner(new Runner(this));
65   nsresult rv = mTarget->Dispatch(runner.forget(), aFlags);
66   if (NS_FAILED(rv)) {
67     NS_WARNING("Failed to dispatch runnable to run TaskQueue");
68     return rv;
69   }
70   mIsRunning = true;
71 
72   return NS_OK;
73 }
74 
AwaitIdle()75 void TaskQueue::AwaitIdle() {
76   MonitorAutoLock mon(mQueueMonitor);
77   AwaitIdleLocked();
78 }
79 
AwaitIdleLocked()80 void TaskQueue::AwaitIdleLocked() {
81   // Make sure there are no tasks for this queue waiting in the caller's tail
82   // dispatcher.
83   MOZ_ASSERT_IF(AbstractThread::GetCurrent(),
84                 !AbstractThread::GetCurrent()->HasTailTasksFor(this));
85 
86   mQueueMonitor.AssertCurrentThreadOwns();
87   MOZ_ASSERT(mIsRunning || mTasks.IsEmpty());
88   while (mIsRunning) {
89     mQueueMonitor.Wait();
90   }
91 }
92 
AwaitShutdownAndIdle()93 void TaskQueue::AwaitShutdownAndIdle() {
94   MOZ_ASSERT(!IsCurrentThreadIn());
95   // Make sure there are no tasks for this queue waiting in the caller's tail
96   // dispatcher.
97   MOZ_ASSERT_IF(AbstractThread::GetCurrent(),
98                 !AbstractThread::GetCurrent()->HasTailTasksFor(this));
99 
100   MonitorAutoLock mon(mQueueMonitor);
101   while (!mIsShutdown) {
102     mQueueMonitor.Wait();
103   }
104   AwaitIdleLocked();
105 }
106 
OnDelayedRunnableCreated(DelayedRunnable * aRunnable)107 void TaskQueue::OnDelayedRunnableCreated(DelayedRunnable* aRunnable) {
108 #ifdef DEBUG
109   MonitorAutoLock mon(mQueueMonitor);
110   MOZ_ASSERT(!mDelayedRunnablesCancelPromise);
111 #endif
112 }
113 
OnDelayedRunnableScheduled(DelayedRunnable * aRunnable)114 void TaskQueue::OnDelayedRunnableScheduled(DelayedRunnable* aRunnable) {
115   MOZ_ASSERT(IsOnCurrentThread());
116   mScheduledDelayedRunnables.AppendElement(aRunnable);
117 }
118 
OnDelayedRunnableRan(DelayedRunnable * aRunnable)119 void TaskQueue::OnDelayedRunnableRan(DelayedRunnable* aRunnable) {
120   MOZ_ASSERT(IsOnCurrentThread());
121   Unused << mScheduledDelayedRunnables.RemoveElement(aRunnable);
122 }
123 
CancelDelayedRunnables()124 auto TaskQueue::CancelDelayedRunnables() -> RefPtr<CancelPromise> {
125   MonitorAutoLock mon(mQueueMonitor);
126   return CancelDelayedRunnablesLocked();
127 }
128 
CancelDelayedRunnablesLocked()129 auto TaskQueue::CancelDelayedRunnablesLocked() -> RefPtr<CancelPromise> {
130   mQueueMonitor.AssertCurrentThreadOwns();
131   if (mDelayedRunnablesCancelPromise) {
132     return mDelayedRunnablesCancelPromise;
133   }
134   mDelayedRunnablesCancelPromise =
135       mDelayedRunnablesCancelHolder.Ensure(__func__);
136   nsCOMPtr<nsIRunnable> cancelRunnable =
137       NewRunnableMethod("TaskQueue::CancelDelayedRunnablesImpl", this,
138                         &TaskQueue::CancelDelayedRunnablesImpl);
139   MOZ_ALWAYS_SUCCEEDS(DispatchLocked(/* passed by ref */ cancelRunnable,
140                                      NS_DISPATCH_NORMAL, TailDispatch));
141   return mDelayedRunnablesCancelPromise;
142 }
143 
CancelDelayedRunnablesImpl()144 void TaskQueue::CancelDelayedRunnablesImpl() {
145   MOZ_ASSERT(IsOnCurrentThread());
146   for (const auto& runnable : mScheduledDelayedRunnables) {
147     runnable->CancelTimer();
148   }
149   mScheduledDelayedRunnables.Clear();
150   mDelayedRunnablesCancelHolder.Resolve(true, __func__);
151 }
152 
BeginShutdown()153 RefPtr<ShutdownPromise> TaskQueue::BeginShutdown() {
154   // Dispatch any tasks for this queue waiting in the caller's tail dispatcher,
155   // since this is the last opportunity to do so.
156   if (AbstractThread* currentThread = AbstractThread::GetCurrent()) {
157     currentThread->TailDispatchTasksFor(this);
158   }
159   MonitorAutoLock mon(mQueueMonitor);
160   Unused << CancelDelayedRunnablesLocked();
161   mIsShutdown = true;
162   RefPtr<ShutdownPromise> p = mShutdownPromise.Ensure(__func__);
163   MaybeResolveShutdown();
164   mon.NotifyAll();
165   return p;
166 }
167 
IsEmpty()168 bool TaskQueue::IsEmpty() {
169   MonitorAutoLock mon(mQueueMonitor);
170   return mTasks.IsEmpty();
171 }
172 
IsCurrentThreadIn() const173 bool TaskQueue::IsCurrentThreadIn() const {
174   bool in = mRunningThread == PR_GetCurrentThread();
175   return in;
176 }
177 
Run()178 nsresult TaskQueue::Runner::Run() {
179   TaskStruct event;
180   {
181     MonitorAutoLock mon(mQueue->mQueueMonitor);
182     MOZ_ASSERT(mQueue->mIsRunning);
183     if (mQueue->mTasks.IsEmpty()) {
184       mQueue->mIsRunning = false;
185       mQueue->MaybeResolveShutdown();
186       mon.NotifyAll();
187       return NS_OK;
188     }
189     event = std::move(mQueue->mTasks.FirstElement());
190     mQueue->mTasks.Pop();
191   }
192   MOZ_ASSERT(event.event);
193 
194   // Note that dropping the queue monitor before running the task, and
195   // taking the monitor again after the task has run ensures we have memory
196   // fences enforced. This means that if the object we're calling wasn't
197   // designed to be threadsafe, it will be, provided we're only calling it
198   // in this task queue.
199   {
200     AutoTaskGuard g(mQueue);
201     SerialEventTargetGuard tg(mQueue);
202     {
203       LogRunnable::Run log(event.event);
204 
205       AUTO_PROFILE_FOLLOWING_RUNNABLE(event.event);
206       event.event->Run();
207 
208       // Drop the reference to event. The event will hold a reference to the
209       // object it's calling, and we don't want to keep it alive, it may be
210       // making assumptions what holds references to it. This is especially
211       // the case if the object is waiting for us to shutdown, so that it
212       // can shutdown (like in the MediaDecoderStateMachine's SHUTDOWN case).
213       event.event = nullptr;
214     }
215   }
216 
217   {
218     MonitorAutoLock mon(mQueue->mQueueMonitor);
219     if (mQueue->mTasks.IsEmpty()) {
220       // No more events to run. Exit the task runner.
221       mQueue->mIsRunning = false;
222       mQueue->MaybeResolveShutdown();
223       mon.NotifyAll();
224       return NS_OK;
225     }
226   }
227 
228   // There's at least one more event that we can run. Dispatch this Runner
229   // to the target again to ensure it runs again. Note that we don't just
230   // run in a loop here so that we don't hog the target. This means we may
231   // run on another thread next time, but we rely on the memory fences from
232   // mQueueMonitor for thread safety of non-threadsafe tasks.
233   nsresult rv;
234   {
235     MonitorAutoLock mon(mQueue->mQueueMonitor);
236     rv = mQueue->mTarget->Dispatch(
237         this, mQueue->mTasks.FirstElement().flags | NS_DISPATCH_AT_END);
238   }
239   if (NS_FAILED(rv)) {
240     // Failed to dispatch, shutdown!
241     MonitorAutoLock mon(mQueue->mQueueMonitor);
242     mQueue->mIsRunning = false;
243     mQueue->mIsShutdown = true;
244     mQueue->MaybeResolveShutdown();
245     mon.NotifyAll();
246   }
247 
248   return NS_OK;
249 }
250 
251 //-----------------------------------------------------------------------------
252 // nsIDirectTaskDispatcher
253 //-----------------------------------------------------------------------------
254 
255 NS_IMETHODIMP
DispatchDirectTask(already_AddRefed<nsIRunnable> aEvent)256 TaskQueue::DispatchDirectTask(already_AddRefed<nsIRunnable> aEvent) {
257   if (!IsCurrentThreadIn()) {
258     return NS_ERROR_FAILURE;
259   }
260   mDirectTasks.AddTask(std::move(aEvent));
261   return NS_OK;
262 }
263 
DrainDirectTasks()264 NS_IMETHODIMP TaskQueue::DrainDirectTasks() {
265   if (!IsCurrentThreadIn()) {
266     return NS_ERROR_FAILURE;
267   }
268   mDirectTasks.DrainTasks();
269   return NS_OK;
270 }
271 
HaveDirectTasks(bool * aValue)272 NS_IMETHODIMP TaskQueue::HaveDirectTasks(bool* aValue) {
273   if (!IsCurrentThreadIn()) {
274     return NS_ERROR_FAILURE;
275   }
276 
277   *aValue = mDirectTasks.HaveTasks();
278   return NS_OK;
279 }
280 
281 }  // namespace mozilla
282