1 // Copyright 2019 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 6 #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 7 8 #include <stddef.h> 9 10 #include "base/base_export.h" 11 #include "base/compiler_specific.h" 12 #include "base/memory/ref_counted.h" 13 #include "base/optional.h" 14 #include "base/sequence_token.h" 15 #include "base/task/common/checked_lock.h" 16 #include "base/task/common/intrusive_heap.h" 17 #include "base/task/task_traits.h" 18 #include "base/task/thread_pool/task.h" 19 #include "base/task/thread_pool/task_source_sort_key.h" 20 #include "base/threading/sequence_local_storage_map.h" 21 22 namespace base { 23 namespace internal { 24 25 class TaskTracker; 26 27 enum class TaskSourceExecutionMode { 28 kParallel, 29 kSequenced, 30 kSingleThread, 31 kJob, 32 kMax = kJob, 33 }; 34 35 struct BASE_EXPORT ExecutionEnvironment { 36 SequenceToken token; 37 SequenceLocalStorageMap* sequence_local_storage; 38 }; 39 40 // A TaskSource is a virtual class that provides a series of Tasks that must be 41 // executed. 42 // 43 // A task source is registered when it's ready to be queued. A task source is 44 // ready to be queued when either: 45 // 1- It has new tasks that can run concurrently as a result of external 46 // operations, e.g. posting a new task to an empty Sequence or increasing 47 // max concurrency of a JobTaskSource; 48 // 2- A worker finished running a task from it and DidProcessTask() returned 49 // true; or 50 // 3- A worker is about to run a task from it and WillRunTask() returned 51 // kAllowedNotSaturated. 52 // 53 // A worker may perform the following sequence of operations on a 54 // RegisteredTaskSource after obtaining it from the queue: 55 // 1- Check whether a task can run with WillRunTask() (and register/enqueue the 56 // task source again if not saturated). 57 // 2- (optional) Iff (1) determined that a task can run, access the next task 58 // with TakeTask(). 59 // 3- (optional) Execute the task. 60 // 4- Inform the task source that a task was processed with DidProcessTask(), 61 // and re-enqueue the task source iff requested. 62 // When a task source is registered multiple times, many overlapping chains of 63 // operations may run concurrently, as permitted by WillRunTask(). This allows 64 // tasks from the same task source to run in parallel. 65 // However, the following invariants are kept: 66 // - The number of workers concurrently running tasks never goes over the 67 // intended concurrency. 68 // - If the task source has more tasks that can run concurrently, it must be 69 // queued. 70 // 71 // Note: there is a known refcounted-ownership cycle in the ThreadPool 72 // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so 73 // long as the other owners of TaskSource (PriorityQueue and WorkerThread in 74 // alternation and ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork() 75 // temporarily) keep running it (and taking Tasks from it as a result). A 76 // dangling reference cycle would only occur should they release their reference 77 // to it while it's not empty. In other words, it is only correct for them to 78 // release it when DidProcessTask() returns false. 79 // 80 // This class is thread-safe. 81 class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { 82 public: 83 // Indicates whether WillRunTask() allows TakeTask() to be called on a 84 // RegisteredTaskSource. 85 enum class RunStatus { 86 // TakeTask() cannot be called. 87 kDisallowed, 88 // TakeTask() may called, and the TaskSource has not reached its maximum 89 // concurrency (i.e. the TaskSource still needs to be queued). 90 kAllowedNotSaturated, 91 // TakeTask() may called, and the TaskSource has reached its maximum 92 // concurrency (i.e. the TaskSource no longer needs to be queued). 93 kAllowedSaturated, 94 }; 95 96 // A Transaction can perform multiple operations atomically on a 97 // TaskSource. While a Transaction is alive, it is guaranteed that nothing 98 // else will access the TaskSource; the TaskSource's lock is held for the 99 // lifetime of the Transaction. 100 class BASE_EXPORT Transaction { 101 public: 102 Transaction(Transaction&& other); 103 Transaction(const Transaction&) = delete; 104 Transaction& operator=(const Transaction&) = delete; 105 ~Transaction(); 106 107 operator bool() const { return !!task_source_; } 108 109 // Sets TaskSource priority to |priority|. 110 void UpdatePriority(TaskPriority priority); 111 112 // Returns the traits of all Tasks in the TaskSource. traits()113 TaskTraits traits() const { return task_source_->traits_; } 114 task_source()115 TaskSource* task_source() const { return task_source_; } 116 117 protected: 118 explicit Transaction(TaskSource* task_source); 119 120 private: 121 friend class TaskSource; 122 123 TaskSource* task_source_; 124 }; 125 126 // |traits| is metadata that applies to all Tasks in the TaskSource. 127 // |task_runner| is a reference to the TaskRunner feeding this TaskSource. 128 // |task_runner| can be nullptr only for tasks with no TaskRunner, in which 129 // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the 130 // execution mode of |task_runner|. 131 TaskSource(const TaskTraits& traits, 132 TaskRunner* task_runner, 133 TaskSourceExecutionMode execution_mode); 134 TaskSource(const TaskSource&) = delete; 135 TaskSource& operator=(const TaskSource&) = delete; 136 137 // Begins a Transaction. This method cannot be called on a thread which has an 138 // active TaskSource::Transaction. 139 Transaction BeginTransaction() WARN_UNUSED_RESULT; 140 141 virtual ExecutionEnvironment GetExecutionEnvironment() = 0; 142 143 // Thread-safe but the returned value may immediately be obsolete. As such 144 // this should only be used as a best-effort guess of how many more workers 145 // are needed. This may be called on an empty task source. 146 virtual size_t GetRemainingConcurrency() const = 0; 147 148 // Returns a TaskSourceSortKey representing the priority of the TaskSource. 149 virtual TaskSourceSortKey GetSortKey(bool disable_fair_scheduling) const = 0; 150 151 // Support for IntrusiveHeap. 152 void SetHeapHandle(const HeapHandle& handle); 153 void ClearHeapHandle(); GetHeapHandle()154 HeapHandle GetHeapHandle() const { return heap_handle_; } 155 heap_handle()156 HeapHandle heap_handle() const { return heap_handle_; } 157 158 // Returns the shutdown behavior of all Tasks in the TaskSource. Can be 159 // accessed without a Transaction because it is never mutated. shutdown_behavior()160 TaskShutdownBehavior shutdown_behavior() const { 161 return traits_.shutdown_behavior(); 162 } 163 // Returns a racy priority of the TaskSource. Can be accessed without a 164 // Transaction but may return an outdated result. priority_racy()165 TaskPriority priority_racy() const { 166 return priority_racy_.load(std::memory_order_relaxed); 167 } 168 // Returns the thread policy of the TaskSource. Can be accessed without a 169 // Transaction because it is never mutated. thread_policy()170 ThreadPolicy thread_policy() const { return traits_.thread_policy(); } 171 172 // A reference to TaskRunner is only retained between PushTask() and when 173 // DidProcessTask() returns false, guaranteeing it is safe to dereference this 174 // pointer. Otherwise, the caller should guarantee such TaskRunner still 175 // exists before dereferencing. task_runner()176 TaskRunner* task_runner() const { return task_runner_; } 177 execution_mode()178 TaskSourceExecutionMode execution_mode() const { return execution_mode_; } 179 180 protected: 181 virtual ~TaskSource(); 182 183 virtual RunStatus WillRunTask() = 0; 184 185 // Implementations of TakeTask(), DidProcessTask() and Clear() must ensure 186 // proper synchronization iff |transaction| is nullptr. 187 virtual Task TakeTask(TaskSource::Transaction* transaction) = 0; 188 virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0; 189 190 // This may be called for each outstanding RegisteredTaskSource that's ready. 191 // The implementation needs to support this being called multiple times; 192 // unless it guarantees never to hand-out multiple RegisteredTaskSources that 193 // are concurrently ready. 194 virtual Task Clear(TaskSource::Transaction* transaction) = 0; 195 196 // Sets TaskSource priority to |priority|. 197 void UpdatePriority(TaskPriority priority); 198 199 // The TaskTraits of all Tasks in the TaskSource. 200 TaskTraits traits_; 201 202 // The cached priority for atomic access. 203 std::atomic<TaskPriority> priority_racy_; 204 205 // Synchronizes access to all members. 206 mutable CheckedLock lock_{UniversalPredecessor()}; 207 208 private: 209 friend class RefCountedThreadSafe<TaskSource>; 210 friend class RegisteredTaskSource; 211 212 // The TaskSource's position in its current PriorityQueue. Access is protected 213 // by the PriorityQueue's lock. 214 HeapHandle heap_handle_; 215 216 // A pointer to the TaskRunner that posts to this TaskSource, if any. The 217 // derived class is responsible for calling AddRef() when a TaskSource from 218 // which no Task is executing becomes non-empty and Release() when 219 // it becomes empty again (e.g. when DidProcessTask() returns false). 220 TaskRunner* task_runner_; 221 222 TaskSourceExecutionMode execution_mode_; 223 }; 224 225 // Wrapper around TaskSource to signify the intent to queue and run it. 226 // RegisteredTaskSource can only be created with TaskTracker and may only be 227 // used by a single worker at a time. However, the same task source may be 228 // registered several times, spawning multiple RegisteredTaskSources. A 229 // RegisteredTaskSource resets to its initial state when WillRunTask() fails 230 // or after DidProcessTask(), so it can be used again. 231 class BASE_EXPORT RegisteredTaskSource { 232 public: 233 RegisteredTaskSource(); 234 RegisteredTaskSource(std::nullptr_t); 235 RegisteredTaskSource(RegisteredTaskSource&& other) noexcept; 236 RegisteredTaskSource(const RegisteredTaskSource&) = delete; 237 RegisteredTaskSource& operator=(const RegisteredTaskSource&) = delete; 238 ~RegisteredTaskSource(); 239 240 RegisteredTaskSource& operator=(RegisteredTaskSource&& other); 241 242 operator bool() const { return task_source_ != nullptr; } 243 TaskSource* operator->() const { return task_source_.get(); } get()244 TaskSource* get() const { return task_source_.get(); } 245 246 static RegisteredTaskSource CreateForTesting( 247 scoped_refptr<TaskSource> task_source, 248 TaskTracker* task_tracker = nullptr); 249 250 // Can only be called if this RegisteredTaskSource is in its initial state. 251 // Returns the underlying task source. An Optional is used in preparation for 252 // the merge between ThreadPool and TaskQueueManager (in Blink). 253 // https://crbug.com/783309 254 scoped_refptr<TaskSource> Unregister(); 255 256 // Informs this TaskSource that the current worker would like to run a Task 257 // from it. Can only be called if in its initial state. Returns a RunStatus 258 // that indicates if the operation is allowed (TakeTask() can be called). 259 TaskSource::RunStatus WillRunTask(); 260 261 // Returns the next task to run from this TaskSource. This should be called 262 // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is 263 // optional and should only be provided if this operation is already part of 264 // a transaction. 265 Task TakeTask(TaskSource::Transaction* transaction = nullptr) 266 WARN_UNUSED_RESULT; 267 268 // Must be called after WillRunTask() or once the task was run if TakeTask() 269 // was called. This resets this RegisteredTaskSource to its initial state so 270 // that WillRunTask() may be called again. |transaction| is optional and 271 // should only be provided if this operation is already part of a transaction. 272 // Returns true if the TaskSource should be queued after this operation. 273 bool DidProcessTask(TaskSource::Transaction* transaction = nullptr); 274 275 // Returns a task that clears this TaskSource to make it empty. |transaction| 276 // is optional and should only be provided if this operation is already part 277 // of a transaction. 278 Task Clear(TaskSource::Transaction* transaction = nullptr) WARN_UNUSED_RESULT; 279 280 private: 281 friend class TaskTracker; 282 RegisteredTaskSource(scoped_refptr<TaskSource> task_source, 283 TaskTracker* task_tracker); 284 285 #if DCHECK_IS_ON() 286 // Indicates the step of a task execution chain. 287 enum class State { 288 kInitial, // WillRunTask() may be called. 289 kReady, // After WillRunTask() returned a valid RunStatus. 290 }; 291 292 State run_step_ = State::kInitial; 293 #endif // DCHECK_IS_ON() 294 295 scoped_refptr<TaskSource> task_source_; 296 TaskTracker* task_tracker_ = nullptr; 297 }; 298 299 // A pair of Transaction and RegisteredTaskSource. Useful to carry a 300 // RegisteredTaskSource with an associated Transaction. 301 // TODO(crbug.com/839091): Rename to RegisteredTaskSourceAndTransaction. 302 struct BASE_EXPORT TransactionWithRegisteredTaskSource { 303 public: 304 TransactionWithRegisteredTaskSource(RegisteredTaskSource task_source_in, 305 TaskSource::Transaction transaction_in); 306 307 TransactionWithRegisteredTaskSource( 308 TransactionWithRegisteredTaskSource&& other) = default; 309 TransactionWithRegisteredTaskSource( 310 const TransactionWithRegisteredTaskSource&) = delete; 311 TransactionWithRegisteredTaskSource& operator=( 312 const TransactionWithRegisteredTaskSource&) = delete; 313 ~TransactionWithRegisteredTaskSource() = default; 314 315 static TransactionWithRegisteredTaskSource FromTaskSource( 316 RegisteredTaskSource task_source_in); 317 318 RegisteredTaskSource task_source; 319 TaskSource::Transaction transaction; 320 }; 321 322 } // namespace internal 323 } // namespace base 324 325 #endif // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 326