1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #if !defined(TaskDispatcher_h_)
8 #  define TaskDispatcher_h_
9 
10 #  include "mozilla/AbstractThread.h"
11 #  include "mozilla/Maybe.h"
12 #  include "mozilla/UniquePtr.h"
13 #  include "mozilla/Unused.h"
14 
15 #  include "nsISupportsImpl.h"
16 #  include "nsTArray.h"
17 #  include "nsThreadUtils.h"
18 
19 #  include <queue>
20 
21 namespace mozilla {
22 
23 /*
24  * A classic approach to cross-thread communication is to dispatch asynchronous
25  * runnables to perform updates on other threads. This generally works well, but
26  * there are sometimes reasons why we might want to delay the actual dispatch of
27  * these tasks until a specified moment. At present, this is primarily useful to
28  * ensure that mirrored state gets updated atomically - but there may be other
29  * applications as well.
30  *
31  * TaskDispatcher is a general abstract class that accepts tasks and dispatches
32  * them at some later point. These groups of tasks are per-target-thread, and
33  * contain separate queues for several kinds of tasks (see comments  below). -
34  * "state change tasks" (which run first, and are intended to be used to update
35  * the value held by mirrors), and regular tasks, which are other arbitrary
36  * operations that the are gated to run after all the state changes have
37  * completed.
38  */
39 class TaskDispatcher {
40  public:
41   TaskDispatcher() = default;
42   virtual ~TaskDispatcher() = default;
43 
44   // Direct tasks are run directly (rather than dispatched asynchronously) when
45   // the tail dispatcher fires. A direct task may cause other tasks to be added
46   // to the tail dispatcher.
47   virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0;
48 
49   // State change tasks are dispatched asynchronously always run before regular
50   // tasks. They are intended to be used to update the value held by mirrors
51   // before any other dispatched tasks are run on the target thread.
52   virtual void AddStateChangeTask(AbstractThread* aThread,
53                                   already_AddRefed<nsIRunnable> aRunnable) = 0;
54 
55   // Regular tasks are dispatched asynchronously, and run after state change
56   // tasks.
57   virtual nsresult AddTask(AbstractThread* aThread,
58                            already_AddRefed<nsIRunnable> aRunnable) = 0;
59 
60   virtual nsresult DispatchTasksFor(AbstractThread* aThread) = 0;
61   virtual bool HasTasksFor(AbstractThread* aThread) = 0;
62   virtual void DrainDirectTasks() = 0;
63 };
64 
65 /*
66  * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires
67  * its queued tasks when it is popped off the stack.
68  */
69 class AutoTaskDispatcher : public TaskDispatcher {
70  public:
71   explicit AutoTaskDispatcher(bool aIsTailDispatcher = false)
mIsTailDispatcher(aIsTailDispatcher)72       : mIsTailDispatcher(aIsTailDispatcher) {}
73 
~AutoTaskDispatcher()74   ~AutoTaskDispatcher() {
75     // Given that direct tasks may trigger other code that uses the tail
76     // dispatcher, it's better to avoid processing them in the tail dispatcher's
77     // destructor. So we require TailDispatchers to manually invoke
78     // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth,
79     // this is only necessary in the case where this AutoTaskDispatcher can be
80     // accessed by the direct tasks it dispatches (true for TailDispatchers, but
81     // potentially not true for other hypothetical AutoTaskDispatchers). Feel
82     // free to loosen this restriction to apply only to mIsTailDispatcher if a
83     // use-case requires it.
84     MOZ_ASSERT(!HaveDirectTasks());
85 
86     for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
87       DispatchTaskGroup(std::move(mTaskGroups[i]));
88     }
89   }
90 
HaveDirectTasks()91   bool HaveDirectTasks() const {
92     return mDirectTasks.isSome() && !mDirectTasks->empty();
93   }
94 
DrainDirectTasks()95   void DrainDirectTasks() override {
96     while (HaveDirectTasks()) {
97       nsCOMPtr<nsIRunnable> r = mDirectTasks->front();
98       mDirectTasks->pop();
99       r->Run();
100     }
101   }
102 
AddDirectTask(already_AddRefed<nsIRunnable> aRunnable)103   void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override {
104     if (mDirectTasks.isNothing()) {
105       mDirectTasks.emplace();
106     }
107     mDirectTasks->push(std::move(aRunnable));
108   }
109 
AddStateChangeTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)110   void AddStateChangeTask(AbstractThread* aThread,
111                           already_AddRefed<nsIRunnable> aRunnable) override {
112     nsCOMPtr<nsIRunnable> r = aRunnable;
113     MOZ_RELEASE_ASSERT(r);
114     EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(r.forget());
115   }
116 
AddTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)117   nsresult AddTask(AbstractThread* aThread,
118                    already_AddRefed<nsIRunnable> aRunnable) override {
119     nsCOMPtr<nsIRunnable> r = aRunnable;
120     MOZ_RELEASE_ASSERT(r);
121     // To preserve the event order, we need to append a new group if the last
122     // group is not targeted for |aThread|.
123     // See https://bugzilla.mozilla.org/show_bug.cgi?id=1318226&mark=0-3#c0
124     // for the details of the issue.
125     if (mTaskGroups.Length() == 0 ||
126         mTaskGroups.LastElement()->mThread != aThread) {
127       mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
128     }
129 
130     PerThreadTaskGroup& group = *mTaskGroups.LastElement();
131     group.mRegularTasks.AppendElement(r.forget());
132 
133     return NS_OK;
134   }
135 
HasTasksFor(AbstractThread * aThread)136   bool HasTasksFor(AbstractThread* aThread) override {
137     return !!GetTaskGroup(aThread) ||
138            (aThread == AbstractThread::GetCurrent() && HaveDirectTasks());
139   }
140 
DispatchTasksFor(AbstractThread * aThread)141   nsresult DispatchTasksFor(AbstractThread* aThread) override {
142     nsresult rv = NS_OK;
143 
144     // Dispatch all groups that match |aThread|.
145     for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
146       if (mTaskGroups[i]->mThread == aThread) {
147         nsresult rv2 = DispatchTaskGroup(std::move(mTaskGroups[i]));
148 
149         if (NS_WARN_IF(NS_FAILED(rv2)) && NS_SUCCEEDED(rv)) {
150           // We should try our best to call DispatchTaskGroup() as much as
151           // possible and return an error if any of DispatchTaskGroup() calls
152           // failed.
153           rv = rv2;
154         }
155 
156         mTaskGroups.RemoveElementAt(i--);
157       }
158     }
159 
160     return rv;
161   }
162 
163  private:
164   struct PerThreadTaskGroup {
165    public:
PerThreadTaskGroupPerThreadTaskGroup166     explicit PerThreadTaskGroup(AbstractThread* aThread) : mThread(aThread) {
167       MOZ_COUNT_CTOR(PerThreadTaskGroup);
168     }
169 
170     MOZ_COUNTED_DTOR(PerThreadTaskGroup)
171 
172     RefPtr<AbstractThread> mThread;
173     nsTArray<nsCOMPtr<nsIRunnable>> mStateChangeTasks;
174     nsTArray<nsCOMPtr<nsIRunnable>> mRegularTasks;
175   };
176 
177   class TaskGroupRunnable : public Runnable {
178    public:
TaskGroupRunnable(UniquePtr<PerThreadTaskGroup> && aTasks)179     explicit TaskGroupRunnable(UniquePtr<PerThreadTaskGroup>&& aTasks)
180         : Runnable("AutoTaskDispatcher::TaskGroupRunnable"),
181           mTasks(std::move(aTasks)) {}
182 
Run()183     NS_IMETHOD Run() override {
184       // State change tasks get run all together before any code is run, so
185       // that all state changes are made in an atomic unit.
186       for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) {
187         mTasks->mStateChangeTasks[i]->Run();
188       }
189 
190       // Once the state changes have completed, drain any direct tasks
191       // generated by those state changes (i.e. watcher notification tasks).
192       // This needs to be outside the loop because we don't want to run code
193       // that might observe intermediate states.
194       MaybeDrainDirectTasks();
195 
196       for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) {
197         mTasks->mRegularTasks[i]->Run();
198 
199         // Scope direct tasks tightly to the task that generated them.
200         MaybeDrainDirectTasks();
201       }
202 
203       return NS_OK;
204     }
205 
206    private:
MaybeDrainDirectTasks()207     void MaybeDrainDirectTasks() {
208       AbstractThread* currentThread = AbstractThread::GetCurrent();
209       if (currentThread && currentThread->MightHaveTailTasks()) {
210         currentThread->TailDispatcher().DrainDirectTasks();
211       }
212     }
213 
214     UniquePtr<PerThreadTaskGroup> mTasks;
215   };
216 
EnsureTaskGroup(AbstractThread * aThread)217   PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread) {
218     PerThreadTaskGroup* existing = GetTaskGroup(aThread);
219     if (existing) {
220       return *existing;
221     }
222 
223     mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
224     return *mTaskGroups.LastElement();
225   }
226 
GetTaskGroup(AbstractThread * aThread)227   PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread) {
228     for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
229       if (mTaskGroups[i]->mThread == aThread) {
230         return mTaskGroups[i].get();
231       }
232     }
233 
234     // Not found.
235     return nullptr;
236   }
237 
DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup)238   nsresult DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup) {
239     RefPtr<AbstractThread> thread = aGroup->mThread;
240 
241     AbstractThread::DispatchReason reason =
242         mIsTailDispatcher ? AbstractThread::TailDispatch
243                           : AbstractThread::NormalDispatch;
244     nsCOMPtr<nsIRunnable> r = new TaskGroupRunnable(std::move(aGroup));
245     return thread->Dispatch(r.forget(), reason);
246   }
247 
248   // Direct tasks. We use a Maybe<> because (a) this class is hot, (b)
249   // mDirectTasks often doesn't get anything put into it, and (c) the
250   // std::queue implementation in GNU libstdc++ does two largish heap
251   // allocations when creating a new std::queue.
252   mozilla::Maybe<std::queue<nsCOMPtr<nsIRunnable>>> mDirectTasks;
253 
254   // Task groups, organized by thread.
255   nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups;
256 
257   // True if this TaskDispatcher represents the tail dispatcher for the thread
258   // upon which it runs.
259   const bool mIsTailDispatcher;
260 };
261 
262 // Little utility class to allow declaring AutoTaskDispatcher as a default
263 // parameter for methods that take a TaskDispatcher&.
264 template <typename T>
265 class PassByRef {
266  public:
267   PassByRef() = default;
268   operator T&() { return mVal; }
269 
270  private:
271   T mVal;
272 };
273 
274 }  // namespace mozilla
275 
276 #endif
277