1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #if !defined(TaskDispatcher_h_) 8 # define TaskDispatcher_h_ 9 10 # include "mozilla/AbstractThread.h" 11 # include "mozilla/Maybe.h" 12 # include "mozilla/UniquePtr.h" 13 # include "mozilla/Unused.h" 14 15 # include "nsISupportsImpl.h" 16 # include "nsTArray.h" 17 # include "nsThreadUtils.h" 18 19 # include <queue> 20 21 namespace mozilla { 22 23 /* 24 * A classic approach to cross-thread communication is to dispatch asynchronous 25 * runnables to perform updates on other threads. This generally works well, but 26 * there are sometimes reasons why we might want to delay the actual dispatch of 27 * these tasks until a specified moment. At present, this is primarily useful to 28 * ensure that mirrored state gets updated atomically - but there may be other 29 * applications as well. 30 * 31 * TaskDispatcher is a general abstract class that accepts tasks and dispatches 32 * them at some later point. These groups of tasks are per-target-thread, and 33 * contain separate queues for several kinds of tasks (see comments below). - 34 * "state change tasks" (which run first, and are intended to be used to update 35 * the value held by mirrors), and regular tasks, which are other arbitrary 36 * operations that the are gated to run after all the state changes have 37 * completed. 38 */ 39 class TaskDispatcher { 40 public: 41 TaskDispatcher() = default; 42 virtual ~TaskDispatcher() = default; 43 44 // Direct tasks are run directly (rather than dispatched asynchronously) when 45 // the tail dispatcher fires. A direct task may cause other tasks to be added 46 // to the tail dispatcher. 47 virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0; 48 49 // State change tasks are dispatched asynchronously always run before regular 50 // tasks. They are intended to be used to update the value held by mirrors 51 // before any other dispatched tasks are run on the target thread. 52 virtual void AddStateChangeTask(AbstractThread* aThread, 53 already_AddRefed<nsIRunnable> aRunnable) = 0; 54 55 // Regular tasks are dispatched asynchronously, and run after state change 56 // tasks. 57 virtual nsresult AddTask(AbstractThread* aThread, 58 already_AddRefed<nsIRunnable> aRunnable) = 0; 59 60 virtual nsresult DispatchTasksFor(AbstractThread* aThread) = 0; 61 virtual bool HasTasksFor(AbstractThread* aThread) = 0; 62 virtual void DrainDirectTasks() = 0; 63 }; 64 65 /* 66 * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires 67 * its queued tasks when it is popped off the stack. 68 */ 69 class AutoTaskDispatcher : public TaskDispatcher { 70 public: 71 explicit AutoTaskDispatcher(bool aIsTailDispatcher = false) mIsTailDispatcher(aIsTailDispatcher)72 : mIsTailDispatcher(aIsTailDispatcher) {} 73 ~AutoTaskDispatcher()74 ~AutoTaskDispatcher() { 75 // Given that direct tasks may trigger other code that uses the tail 76 // dispatcher, it's better to avoid processing them in the tail dispatcher's 77 // destructor. So we require TailDispatchers to manually invoke 78 // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth, 79 // this is only necessary in the case where this AutoTaskDispatcher can be 80 // accessed by the direct tasks it dispatches (true for TailDispatchers, but 81 // potentially not true for other hypothetical AutoTaskDispatchers). Feel 82 // free to loosen this restriction to apply only to mIsTailDispatcher if a 83 // use-case requires it. 84 MOZ_ASSERT(!HaveDirectTasks()); 85 86 for (size_t i = 0; i < mTaskGroups.Length(); ++i) { 87 DispatchTaskGroup(std::move(mTaskGroups[i])); 88 } 89 } 90 HaveDirectTasks()91 bool HaveDirectTasks() const { 92 return mDirectTasks.isSome() && !mDirectTasks->empty(); 93 } 94 DrainDirectTasks()95 void DrainDirectTasks() override { 96 while (HaveDirectTasks()) { 97 nsCOMPtr<nsIRunnable> r = mDirectTasks->front(); 98 mDirectTasks->pop(); 99 r->Run(); 100 } 101 } 102 AddDirectTask(already_AddRefed<nsIRunnable> aRunnable)103 void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override { 104 if (mDirectTasks.isNothing()) { 105 mDirectTasks.emplace(); 106 } 107 mDirectTasks->push(std::move(aRunnable)); 108 } 109 AddStateChangeTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)110 void AddStateChangeTask(AbstractThread* aThread, 111 already_AddRefed<nsIRunnable> aRunnable) override { 112 nsCOMPtr<nsIRunnable> r = aRunnable; 113 MOZ_RELEASE_ASSERT(r); 114 EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(r.forget()); 115 } 116 AddTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)117 nsresult AddTask(AbstractThread* aThread, 118 already_AddRefed<nsIRunnable> aRunnable) override { 119 nsCOMPtr<nsIRunnable> r = aRunnable; 120 MOZ_RELEASE_ASSERT(r); 121 // To preserve the event order, we need to append a new group if the last 122 // group is not targeted for |aThread|. 123 // See https://bugzilla.mozilla.org/show_bug.cgi?id=1318226&mark=0-3#c0 124 // for the details of the issue. 125 if (mTaskGroups.Length() == 0 || 126 mTaskGroups.LastElement()->mThread != aThread) { 127 mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); 128 } 129 130 PerThreadTaskGroup& group = *mTaskGroups.LastElement(); 131 group.mRegularTasks.AppendElement(r.forget()); 132 133 return NS_OK; 134 } 135 HasTasksFor(AbstractThread * aThread)136 bool HasTasksFor(AbstractThread* aThread) override { 137 return !!GetTaskGroup(aThread) || 138 (aThread == AbstractThread::GetCurrent() && HaveDirectTasks()); 139 } 140 DispatchTasksFor(AbstractThread * aThread)141 nsresult DispatchTasksFor(AbstractThread* aThread) override { 142 nsresult rv = NS_OK; 143 144 // Dispatch all groups that match |aThread|. 145 for (size_t i = 0; i < mTaskGroups.Length(); ++i) { 146 if (mTaskGroups[i]->mThread == aThread) { 147 nsresult rv2 = DispatchTaskGroup(std::move(mTaskGroups[i])); 148 149 if (NS_WARN_IF(NS_FAILED(rv2)) && NS_SUCCEEDED(rv)) { 150 // We should try our best to call DispatchTaskGroup() as much as 151 // possible and return an error if any of DispatchTaskGroup() calls 152 // failed. 153 rv = rv2; 154 } 155 156 mTaskGroups.RemoveElementAt(i--); 157 } 158 } 159 160 return rv; 161 } 162 163 private: 164 struct PerThreadTaskGroup { 165 public: PerThreadTaskGroupPerThreadTaskGroup166 explicit PerThreadTaskGroup(AbstractThread* aThread) : mThread(aThread) { 167 MOZ_COUNT_CTOR(PerThreadTaskGroup); 168 } 169 170 MOZ_COUNTED_DTOR(PerThreadTaskGroup) 171 172 RefPtr<AbstractThread> mThread; 173 nsTArray<nsCOMPtr<nsIRunnable>> mStateChangeTasks; 174 nsTArray<nsCOMPtr<nsIRunnable>> mRegularTasks; 175 }; 176 177 class TaskGroupRunnable : public Runnable { 178 public: TaskGroupRunnable(UniquePtr<PerThreadTaskGroup> && aTasks)179 explicit TaskGroupRunnable(UniquePtr<PerThreadTaskGroup>&& aTasks) 180 : Runnable("AutoTaskDispatcher::TaskGroupRunnable"), 181 mTasks(std::move(aTasks)) {} 182 Run()183 NS_IMETHOD Run() override { 184 // State change tasks get run all together before any code is run, so 185 // that all state changes are made in an atomic unit. 186 for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) { 187 mTasks->mStateChangeTasks[i]->Run(); 188 } 189 190 // Once the state changes have completed, drain any direct tasks 191 // generated by those state changes (i.e. watcher notification tasks). 192 // This needs to be outside the loop because we don't want to run code 193 // that might observe intermediate states. 194 MaybeDrainDirectTasks(); 195 196 for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) { 197 mTasks->mRegularTasks[i]->Run(); 198 199 // Scope direct tasks tightly to the task that generated them. 200 MaybeDrainDirectTasks(); 201 } 202 203 return NS_OK; 204 } 205 206 private: MaybeDrainDirectTasks()207 void MaybeDrainDirectTasks() { 208 AbstractThread* currentThread = AbstractThread::GetCurrent(); 209 if (currentThread && currentThread->MightHaveTailTasks()) { 210 currentThread->TailDispatcher().DrainDirectTasks(); 211 } 212 } 213 214 UniquePtr<PerThreadTaskGroup> mTasks; 215 }; 216 EnsureTaskGroup(AbstractThread * aThread)217 PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread) { 218 PerThreadTaskGroup* existing = GetTaskGroup(aThread); 219 if (existing) { 220 return *existing; 221 } 222 223 mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); 224 return *mTaskGroups.LastElement(); 225 } 226 GetTaskGroup(AbstractThread * aThread)227 PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread) { 228 for (size_t i = 0; i < mTaskGroups.Length(); ++i) { 229 if (mTaskGroups[i]->mThread == aThread) { 230 return mTaskGroups[i].get(); 231 } 232 } 233 234 // Not found. 235 return nullptr; 236 } 237 DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup)238 nsresult DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup) { 239 RefPtr<AbstractThread> thread = aGroup->mThread; 240 241 AbstractThread::DispatchReason reason = 242 mIsTailDispatcher ? AbstractThread::TailDispatch 243 : AbstractThread::NormalDispatch; 244 nsCOMPtr<nsIRunnable> r = new TaskGroupRunnable(std::move(aGroup)); 245 return thread->Dispatch(r.forget(), reason); 246 } 247 248 // Direct tasks. We use a Maybe<> because (a) this class is hot, (b) 249 // mDirectTasks often doesn't get anything put into it, and (c) the 250 // std::queue implementation in GNU libstdc++ does two largish heap 251 // allocations when creating a new std::queue. 252 mozilla::Maybe<std::queue<nsCOMPtr<nsIRunnable>>> mDirectTasks; 253 254 // Task groups, organized by thread. 255 nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups; 256 257 // True if this TaskDispatcher represents the tail dispatcher for the thread 258 // upon which it runs. 259 const bool mIsTailDispatcher; 260 }; 261 262 // Little utility class to allow declaring AutoTaskDispatcher as a default 263 // parameter for methods that take a TaskDispatcher&. 264 template <typename T> 265 class PassByRef { 266 public: 267 PassByRef() = default; 268 operator T&() { return mVal; } 269 270 private: 271 T mVal; 272 }; 273 274 } // namespace mozilla 275 276 #endif 277