1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #if !defined(TaskDispatcher_h_) 8 # define TaskDispatcher_h_ 9 10 # include <queue> 11 12 # include "mozilla/AbstractThread.h" 13 # include "mozilla/Maybe.h" 14 # include "mozilla/UniquePtr.h" 15 # include "nsIDirectTaskDispatcher.h" 16 # include "nsISupportsImpl.h" 17 # include "nsTArray.h" 18 # include "nsThreadUtils.h" 19 20 namespace mozilla { 21 22 class SimpleTaskQueue { 23 public: 24 SimpleTaskQueue() = default; 25 virtual ~SimpleTaskQueue() = default; 26 AddTask(already_AddRefed<nsIRunnable> aRunnable)27 void AddTask(already_AddRefed<nsIRunnable> aRunnable) { 28 if (!mTasks) { 29 mTasks.emplace(); 30 } 31 mTasks->push(std::move(aRunnable)); 32 } 33 DrainTasks()34 void DrainTasks() { 35 if (!mTasks) { 36 return; 37 } 38 auto& queue = mTasks.ref(); 39 while (!queue.empty()) { 40 nsCOMPtr<nsIRunnable> r = std::move(queue.front()); 41 queue.pop(); 42 r->Run(); 43 } 44 } 45 HaveTasks()46 bool HaveTasks() const { return mTasks && !mTasks->empty(); } 47 48 private: 49 // We use a Maybe<> because (a) when used for DirectTasks it often doesn't get 50 // anything put into it, and (b) the std::queue implementation in GNU 51 // libstdc++ does two largish heap allocations when creating a new std::queue. 52 Maybe<std::queue<nsCOMPtr<nsIRunnable>>> mTasks; 53 }; 54 55 /* 56 * A classic approach to cross-thread communication is to dispatch asynchronous 57 * runnables to perform updates on other threads. This generally works well, but 58 * there are sometimes reasons why we might want to delay the actual dispatch of 59 * these tasks until a specified moment. At present, this is primarily useful to 60 * ensure that mirrored state gets updated atomically - but there may be other 61 * applications as well. 62 * 63 * TaskDispatcher is a general abstract class that accepts tasks and dispatches 64 * them at some later point. These groups of tasks are per-target-thread, and 65 * contain separate queues for several kinds of tasks (see comments below). - 66 * "state change tasks" (which run first, and are intended to be used to update 67 * the value held by mirrors), and regular tasks, which are other arbitrary 68 * operations that the are gated to run after all the state changes have 69 * completed. 70 */ 71 class TaskDispatcher { 72 public: 73 TaskDispatcher() = default; 74 virtual ~TaskDispatcher() = default; 75 76 // Direct tasks are run directly (rather than dispatched asynchronously) when 77 // the tail dispatcher fires. A direct task may cause other tasks to be added 78 // to the tail dispatcher. 79 virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0; 80 81 // State change tasks are dispatched asynchronously always run before regular 82 // tasks. They are intended to be used to update the value held by mirrors 83 // before any other dispatched tasks are run on the target thread. 84 virtual void AddStateChangeTask(AbstractThread* aThread, 85 already_AddRefed<nsIRunnable> aRunnable) = 0; 86 87 // Regular tasks are dispatched asynchronously, and run after state change 88 // tasks. 89 virtual nsresult AddTask(AbstractThread* aThread, 90 already_AddRefed<nsIRunnable> aRunnable) = 0; 91 92 virtual nsresult DispatchTasksFor(AbstractThread* aThread) = 0; 93 virtual bool HasTasksFor(AbstractThread* aThread) = 0; 94 virtual void DrainDirectTasks() = 0; 95 }; 96 97 /* 98 * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires 99 * its queued tasks when it is popped off the stack. 100 */ 101 class AutoTaskDispatcher : public TaskDispatcher { 102 public: 103 explicit AutoTaskDispatcher(nsIDirectTaskDispatcher* aDirectTaskDispatcher, 104 bool aIsTailDispatcher = false) mDirectTaskDispatcher(aDirectTaskDispatcher)105 : mDirectTaskDispatcher(aDirectTaskDispatcher), 106 mIsTailDispatcher(aIsTailDispatcher) {} 107 ~AutoTaskDispatcher()108 ~AutoTaskDispatcher() { 109 // Given that direct tasks may trigger other code that uses the tail 110 // dispatcher, it's better to avoid processing them in the tail dispatcher's 111 // destructor. So we require TailDispatchers to manually invoke 112 // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth, 113 // this is only necessary in the case where this AutoTaskDispatcher can be 114 // accessed by the direct tasks it dispatches (true for TailDispatchers, but 115 // potentially not true for other hypothetical AutoTaskDispatchers). Feel 116 // free to loosen this restriction to apply only to mIsTailDispatcher if a 117 // use-case requires it. 118 MOZ_ASSERT(!HaveDirectTasks()); 119 120 for (size_t i = 0; i < mTaskGroups.Length(); ++i) { 121 DispatchTaskGroup(std::move(mTaskGroups[i])); 122 } 123 } 124 HaveDirectTasks()125 bool HaveDirectTasks() { 126 return mDirectTaskDispatcher && mDirectTaskDispatcher->HaveDirectTasks(); 127 } 128 DrainDirectTasks()129 void DrainDirectTasks() override { 130 if (mDirectTaskDispatcher) { 131 mDirectTaskDispatcher->DrainDirectTasks(); 132 } 133 } 134 AddDirectTask(already_AddRefed<nsIRunnable> aRunnable)135 void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override { 136 MOZ_ASSERT(mDirectTaskDispatcher); 137 mDirectTaskDispatcher->DispatchDirectTask(std::move(aRunnable)); 138 } 139 AddStateChangeTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)140 void AddStateChangeTask(AbstractThread* aThread, 141 already_AddRefed<nsIRunnable> aRunnable) override { 142 nsCOMPtr<nsIRunnable> r = aRunnable; 143 MOZ_RELEASE_ASSERT(r); 144 EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(r.forget()); 145 } 146 AddTask(AbstractThread * aThread,already_AddRefed<nsIRunnable> aRunnable)147 nsresult AddTask(AbstractThread* aThread, 148 already_AddRefed<nsIRunnable> aRunnable) override { 149 nsCOMPtr<nsIRunnable> r = aRunnable; 150 MOZ_RELEASE_ASSERT(r); 151 // To preserve the event order, we need to append a new group if the last 152 // group is not targeted for |aThread|. 153 // See https://bugzilla.mozilla.org/show_bug.cgi?id=1318226&mark=0-3#c0 154 // for the details of the issue. 155 if (mTaskGroups.Length() == 0 || 156 mTaskGroups.LastElement()->mThread != aThread) { 157 mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); 158 } 159 160 PerThreadTaskGroup& group = *mTaskGroups.LastElement(); 161 group.mRegularTasks.AppendElement(r.forget()); 162 163 return NS_OK; 164 } 165 HasTasksFor(AbstractThread * aThread)166 bool HasTasksFor(AbstractThread* aThread) override { 167 return !!GetTaskGroup(aThread) || 168 (aThread == AbstractThread::GetCurrent() && HaveDirectTasks()); 169 } 170 DispatchTasksFor(AbstractThread * aThread)171 nsresult DispatchTasksFor(AbstractThread* aThread) override { 172 nsresult rv = NS_OK; 173 174 // Dispatch all groups that match |aThread|. 175 for (size_t i = 0; i < mTaskGroups.Length(); ++i) { 176 if (mTaskGroups[i]->mThread == aThread) { 177 nsresult rv2 = DispatchTaskGroup(std::move(mTaskGroups[i])); 178 179 if (NS_WARN_IF(NS_FAILED(rv2)) && NS_SUCCEEDED(rv)) { 180 // We should try our best to call DispatchTaskGroup() as much as 181 // possible and return an error if any of DispatchTaskGroup() calls 182 // failed. 183 rv = rv2; 184 } 185 186 mTaskGroups.RemoveElementAt(i--); 187 } 188 } 189 190 return rv; 191 } 192 193 private: 194 struct PerThreadTaskGroup { 195 public: PerThreadTaskGroupPerThreadTaskGroup196 explicit PerThreadTaskGroup(AbstractThread* aThread) : mThread(aThread) { 197 MOZ_COUNT_CTOR(PerThreadTaskGroup); 198 } 199 200 MOZ_COUNTED_DTOR(PerThreadTaskGroup) 201 202 RefPtr<AbstractThread> mThread; 203 nsTArray<nsCOMPtr<nsIRunnable>> mStateChangeTasks; 204 nsTArray<nsCOMPtr<nsIRunnable>> mRegularTasks; 205 }; 206 207 class TaskGroupRunnable : public Runnable { 208 public: TaskGroupRunnable(UniquePtr<PerThreadTaskGroup> && aTasks)209 explicit TaskGroupRunnable(UniquePtr<PerThreadTaskGroup>&& aTasks) 210 : Runnable("AutoTaskDispatcher::TaskGroupRunnable"), 211 mTasks(std::move(aTasks)) {} 212 Run()213 NS_IMETHOD Run() override { 214 // State change tasks get run all together before any code is run, so 215 // that all state changes are made in an atomic unit. 216 for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) { 217 mTasks->mStateChangeTasks[i]->Run(); 218 } 219 220 // Once the state changes have completed, drain any direct tasks 221 // generated by those state changes (i.e. watcher notification tasks). 222 // This needs to be outside the loop because we don't want to run code 223 // that might observe intermediate states. 224 MaybeDrainDirectTasks(); 225 226 for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) { 227 mTasks->mRegularTasks[i]->Run(); 228 229 // Scope direct tasks tightly to the task that generated them. 230 MaybeDrainDirectTasks(); 231 } 232 233 return NS_OK; 234 } 235 236 private: MaybeDrainDirectTasks()237 void MaybeDrainDirectTasks() { 238 AbstractThread* currentThread = AbstractThread::GetCurrent(); 239 if (currentThread && currentThread->MightHaveTailTasks()) { 240 currentThread->TailDispatcher().DrainDirectTasks(); 241 } 242 } 243 244 UniquePtr<PerThreadTaskGroup> mTasks; 245 }; 246 EnsureTaskGroup(AbstractThread * aThread)247 PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread) { 248 PerThreadTaskGroup* existing = GetTaskGroup(aThread); 249 if (existing) { 250 return *existing; 251 } 252 253 mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); 254 return *mTaskGroups.LastElement(); 255 } 256 GetTaskGroup(AbstractThread * aThread)257 PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread) { 258 for (size_t i = 0; i < mTaskGroups.Length(); ++i) { 259 if (mTaskGroups[i]->mThread == aThread) { 260 return mTaskGroups[i].get(); 261 } 262 } 263 264 // Not found. 265 return nullptr; 266 } 267 DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup)268 nsresult DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup) { 269 RefPtr<AbstractThread> thread = aGroup->mThread; 270 271 AbstractThread::DispatchReason reason = 272 mIsTailDispatcher ? AbstractThread::TailDispatch 273 : AbstractThread::NormalDispatch; 274 nsCOMPtr<nsIRunnable> r = new TaskGroupRunnable(std::move(aGroup)); 275 return thread->Dispatch(r.forget(), reason); 276 } 277 278 // Task groups, organized by thread. 279 nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups; 280 281 nsCOMPtr<nsIDirectTaskDispatcher> mDirectTaskDispatcher; 282 // True if this TaskDispatcher represents the tail dispatcher for the thread 283 // upon which it runs. 284 const bool mIsTailDispatcher; 285 }; 286 287 // Little utility class to allow declaring AutoTaskDispatcher as a default 288 // parameter for methods that take a TaskDispatcher&. 289 template <typename T> 290 class PassByRef { 291 public: 292 PassByRef() = default; 293 operator T&() { return mVal; } 294 295 private: 296 T mVal; 297 }; 298 299 } // namespace mozilla 300 301 #endif 302