1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #if !defined(TaskDispatcher_h_)
8 #  define TaskDispatcher_h_
9 
10 #  include <queue>
11 
12 #  include "mozilla/AbstractThread.h"
13 #  include "mozilla/Maybe.h"
14 #  include "mozilla/UniquePtr.h"
15 #  include "nsIDirectTaskDispatcher.h"
16 #  include "nsISupportsImpl.h"
17 #  include "nsTArray.h"
18 #  include "nsThreadUtils.h"
19 
20 namespace mozilla {
21 
22 class SimpleTaskQueue {
23  public:
24   SimpleTaskQueue() = default;
25   virtual ~SimpleTaskQueue() = default;
26 
AddTask(already_AddRefed<nsIRunnable> aRunnable)27   void AddTask(already_AddRefed<nsIRunnable> aRunnable) {
28     if (!mTasks) {
29       mTasks.emplace();
30     }
31     mTasks->push(std::move(aRunnable));
32   }
33 
DrainTasks()34   void DrainTasks() {
35     if (!mTasks) {
36       return;
37     }
38     auto& queue = mTasks.ref();
39     while (!queue.empty()) {
40       nsCOMPtr<nsIRunnable> r = std::move(queue.front());
41       queue.pop();
42       r->Run();
43     }
44   }
45 
HaveTasks()46   bool HaveTasks() const { return mTasks && !mTasks->empty(); }
47 
48  private:
49   // We use a Maybe<> because (a) when used for DirectTasks it often doesn't get
50   // anything put into it, and (b) the std::queue implementation in GNU
51   // libstdc++ does two largish heap allocations when creating a new std::queue.
52   Maybe<std::queue<nsCOMPtr<nsIRunnable>>> mTasks;
53 };
54 
55 /*
56  * A classic approach to cross-thread communication is to dispatch asynchronous
57  * runnables to perform updates on other threads. This generally works well, but
58  * there are sometimes reasons why we might want to delay the actual dispatch of
59  * these tasks until a specified moment. At present, this is primarily useful to
60  * ensure that mirrored state gets updated atomically - but there may be other
61  * applications as well.
62  *
63  * TaskDispatcher is a general abstract class that accepts tasks and dispatches
64  * them at some later point. These groups of tasks are per-target-thread, and
65  * contain separate queues for several kinds of tasks (see comments  below). -
66  * "state change tasks" (which run first, and are intended to be used to update
67  * the value held by mirrors), and regular tasks, which are other arbitrary
68  * operations that the are gated to run after all the state changes have
69  * completed.
70  */
71 class TaskDispatcher {
72  public:
73   TaskDispatcher() = default;
74   virtual ~TaskDispatcher() = default;
75 
76   // Direct tasks are run directly (rather than dispatched asynchronously) when
77   // the tail dispatcher fires. A direct task may cause other tasks to be added
78   // to the tail dispatcher.
79   virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0;
80 
81   // State change tasks are dispatched asynchronously always run before regular
82   // tasks. They are intended to be used to update the value held by mirrors
83   // before any other dispatched tasks are run on the target thread.
84   virtual void AddStateChangeTask(AbstractThread* aThread,
85                                   already_AddRefed<nsIRunnable> aRunnable) = 0;
86 
87   // Regular tasks are dispatched asynchronously, and run after state change
88   // tasks.
89   virtual nsresult AddTask(AbstractThread* aThread,
90                            already_AddRefed<nsIRunnable> aRunnable) = 0;
91 
92   virtual nsresult DispatchTasksFor(AbstractThread* aThread) = 0;
93   virtual bool HasTasksFor(AbstractThread* aThread) = 0;
94   virtual void DrainDirectTasks() = 0;
95 };
96 
97 /*
98  * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires
99  * its queued tasks when it is popped off the stack.
100  */
101 class AutoTaskDispatcher : public TaskDispatcher {
102  public:
103   explicit AutoTaskDispatcher(nsIDirectTaskDispatcher* aDirectTaskDispatcher,
104                               bool aIsTailDispatcher = false)
mDirectTaskDispatcher(aDirectTaskDispatcher)105       : mDirectTaskDispatcher(aDirectTaskDispatcher),
106         mIsTailDispatcher(aIsTailDispatcher) {}
107 
~AutoTaskDispatcher()108   ~AutoTaskDispatcher() {
109     // Given that direct tasks may trigger other code that uses the tail
110     // dispatcher, it's better to avoid processing them in the tail dispatcher's
111     // destructor. So we require TailDispatchers to manually invoke
112     // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth,
113     // this is only necessary in the case where this AutoTaskDispatcher can be
114     // accessed by the direct tasks it dispatches (true for TailDispatchers, but
115     // potentially not true for other hypothetical AutoTaskDispatchers). Feel
116     // free to loosen this restriction to apply only to mIsTailDispatcher if a
117     // use-case requires it.
118     MOZ_ASSERT(!HaveDirectTasks());
119 
120     for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
121       DispatchTaskGroup(std::move(mTaskGroups[i]));
122     }
123   }
124 
HaveDirectTasks()125   bool HaveDirectTasks() {
126     return mDirectTaskDispatcher && mDirectTaskDispatcher->HaveDirectTasks();
127   }
128 
DrainDirectTasks()129   void DrainDirectTasks() override {
130     if (mDirectTaskDispatcher) {
131       mDirectTaskDispatcher->DrainDirectTasks();
132     }
133   }
134 
AddDirectTask(already_AddRefed<nsIRunnable> aRunnable)135   void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override {
136     MOZ_ASSERT(mDirectTaskDispatcher);
137     mDirectTaskDispatcher->DispatchDirectTask(std::move(aRunnable));
138   }
139 
AddStateChangeTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)140   void AddStateChangeTask(AbstractThread* aThread,
141                           already_AddRefed<nsIRunnable> aRunnable) override {
142     nsCOMPtr<nsIRunnable> r = aRunnable;
143     MOZ_RELEASE_ASSERT(r);
144     EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(r.forget());
145   }
146 
AddTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)147   nsresult AddTask(AbstractThread* aThread,
148                    already_AddRefed<nsIRunnable> aRunnable) override {
149     nsCOMPtr<nsIRunnable> r = aRunnable;
150     MOZ_RELEASE_ASSERT(r);
151     // To preserve the event order, we need to append a new group if the last
152     // group is not targeted for |aThread|.
153     // See https://bugzilla.mozilla.org/show_bug.cgi?id=1318226&mark=0-3#c0
154     // for the details of the issue.
155     if (mTaskGroups.Length() == 0 ||
156         mTaskGroups.LastElement()->mThread != aThread) {
157       mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
158     }
159 
160     PerThreadTaskGroup& group = *mTaskGroups.LastElement();
161     group.mRegularTasks.AppendElement(r.forget());
162 
163     return NS_OK;
164   }
165 
HasTasksFor(AbstractThread * aThread)166   bool HasTasksFor(AbstractThread* aThread) override {
167     return !!GetTaskGroup(aThread) ||
168            (aThread == AbstractThread::GetCurrent() && HaveDirectTasks());
169   }
170 
DispatchTasksFor(AbstractThread * aThread)171   nsresult DispatchTasksFor(AbstractThread* aThread) override {
172     nsresult rv = NS_OK;
173 
174     // Dispatch all groups that match |aThread|.
175     for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
176       if (mTaskGroups[i]->mThread == aThread) {
177         nsresult rv2 = DispatchTaskGroup(std::move(mTaskGroups[i]));
178 
179         if (NS_WARN_IF(NS_FAILED(rv2)) && NS_SUCCEEDED(rv)) {
180           // We should try our best to call DispatchTaskGroup() as much as
181           // possible and return an error if any of DispatchTaskGroup() calls
182           // failed.
183           rv = rv2;
184         }
185 
186         mTaskGroups.RemoveElementAt(i--);
187       }
188     }
189 
190     return rv;
191   }
192 
193  private:
194   struct PerThreadTaskGroup {
195    public:
PerThreadTaskGroupPerThreadTaskGroup196     explicit PerThreadTaskGroup(AbstractThread* aThread) : mThread(aThread) {
197       MOZ_COUNT_CTOR(PerThreadTaskGroup);
198     }
199 
200     MOZ_COUNTED_DTOR(PerThreadTaskGroup)
201 
202     RefPtr<AbstractThread> mThread;
203     nsTArray<nsCOMPtr<nsIRunnable>> mStateChangeTasks;
204     nsTArray<nsCOMPtr<nsIRunnable>> mRegularTasks;
205   };
206 
207   class TaskGroupRunnable : public Runnable {
208    public:
TaskGroupRunnable(UniquePtr<PerThreadTaskGroup> && aTasks)209     explicit TaskGroupRunnable(UniquePtr<PerThreadTaskGroup>&& aTasks)
210         : Runnable("AutoTaskDispatcher::TaskGroupRunnable"),
211           mTasks(std::move(aTasks)) {}
212 
Run()213     NS_IMETHOD Run() override {
214       // State change tasks get run all together before any code is run, so
215       // that all state changes are made in an atomic unit.
216       for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) {
217         mTasks->mStateChangeTasks[i]->Run();
218       }
219 
220       // Once the state changes have completed, drain any direct tasks
221       // generated by those state changes (i.e. watcher notification tasks).
222       // This needs to be outside the loop because we don't want to run code
223       // that might observe intermediate states.
224       MaybeDrainDirectTasks();
225 
226       for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) {
227         mTasks->mRegularTasks[i]->Run();
228 
229         // Scope direct tasks tightly to the task that generated them.
230         MaybeDrainDirectTasks();
231       }
232 
233       return NS_OK;
234     }
235 
236    private:
MaybeDrainDirectTasks()237     void MaybeDrainDirectTasks() {
238       AbstractThread* currentThread = AbstractThread::GetCurrent();
239       if (currentThread && currentThread->MightHaveTailTasks()) {
240         currentThread->TailDispatcher().DrainDirectTasks();
241       }
242     }
243 
244     UniquePtr<PerThreadTaskGroup> mTasks;
245   };
246 
EnsureTaskGroup(AbstractThread * aThread)247   PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread) {
248     PerThreadTaskGroup* existing = GetTaskGroup(aThread);
249     if (existing) {
250       return *existing;
251     }
252 
253     mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
254     return *mTaskGroups.LastElement();
255   }
256 
GetTaskGroup(AbstractThread * aThread)257   PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread) {
258     for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
259       if (mTaskGroups[i]->mThread == aThread) {
260         return mTaskGroups[i].get();
261       }
262     }
263 
264     // Not found.
265     return nullptr;
266   }
267 
DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup)268   nsresult DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup) {
269     RefPtr<AbstractThread> thread = aGroup->mThread;
270 
271     AbstractThread::DispatchReason reason =
272         mIsTailDispatcher ? AbstractThread::TailDispatch
273                           : AbstractThread::NormalDispatch;
274     nsCOMPtr<nsIRunnable> r = new TaskGroupRunnable(std::move(aGroup));
275     return thread->Dispatch(r.forget(), reason);
276   }
277 
278   // Task groups, organized by thread.
279   nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups;
280 
281   nsCOMPtr<nsIDirectTaskDispatcher> mDirectTaskDispatcher;
282   // True if this TaskDispatcher represents the tail dispatcher for the thread
283   // upon which it runs.
284   const bool mIsTailDispatcher;
285 };
286 
287 // Little utility class to allow declaring AutoTaskDispatcher as a default
288 // parameter for methods that take a TaskDispatcher&.
289 template <typename T>
290 class PassByRef {
291  public:
292   PassByRef() = default;
293   operator T&() { return mVal; }
294 
295  private:
296   T mVal;
297 };
298 
299 }  // namespace mozilla
300 
301 #endif
302