1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
6 #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
7 
8 #include <stddef.h>
9 
10 #include "base/base_export.h"
11 #include "base/compiler_specific.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/optional.h"
14 #include "base/sequence_token.h"
15 #include "base/task/common/checked_lock.h"
16 #include "base/task/common/intrusive_heap.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/task.h"
19 #include "base/task/thread_pool/task_source_sort_key.h"
20 #include "base/threading/sequence_local_storage_map.h"
21 
22 namespace base {
23 namespace internal {
24 
25 class TaskTracker;
26 
27 enum class TaskSourceExecutionMode {
28   kParallel,
29   kSequenced,
30   kSingleThread,
31   kJob,
32   kMax = kJob,
33 };
34 
35 struct BASE_EXPORT ExecutionEnvironment {
36   SequenceToken token;
37   SequenceLocalStorageMap* sequence_local_storage;
38 };
39 
40 // A TaskSource is a virtual class that provides a series of Tasks that must be
41 // executed.
42 //
43 // A task source is registered when it's ready to be queued. A task source is
44 // ready to be queued when either:
45 // 1- It has new tasks that can run concurrently as a result of external
46 //    operations, e.g. posting a new task to an empty Sequence or increasing
47 //    max concurrency of a JobTaskSource;
48 // 2- A worker finished running a task from it and DidProcessTask() returned
49 //    true; or
50 // 3- A worker is about to run a task from it and WillRunTask() returned
51 //    kAllowedNotSaturated.
52 //
53 // A worker may perform the following sequence of operations on a
54 // RegisteredTaskSource after obtaining it from the queue:
55 // 1- Check whether a task can run with WillRunTask() (and register/enqueue the
56 //    task source again if not saturated).
57 // 2- (optional) Iff (1) determined that a task can run, access the next task
58 //    with TakeTask().
59 // 3- (optional) Execute the task.
60 // 4- Inform the task source that a task was processed with DidProcessTask(),
61 //    and re-enqueue the task source iff requested.
62 // When a task source is registered multiple times, many overlapping chains of
63 // operations may run concurrently, as permitted by WillRunTask(). This allows
64 // tasks from the same task source to run in parallel.
65 // However, the following invariants are kept:
66 // - The number of workers concurrently running tasks never goes over the
67 //   intended concurrency.
68 // - If the task source has more tasks that can run concurrently, it must be
69 //   queued.
70 //
71 // Note: there is a known refcounted-ownership cycle in the ThreadPool
72 // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so
73 // long as the other owners of TaskSource (PriorityQueue and WorkerThread in
74 // alternation and ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork()
75 // temporarily) keep running it (and taking Tasks from it as a result). A
76 // dangling reference cycle would only occur should they release their reference
77 // to it while it's not empty. In other words, it is only correct for them to
78 // release it when DidProcessTask() returns false.
79 //
80 // This class is thread-safe.
81 class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
82  public:
83   // Indicates whether WillRunTask() allows TakeTask() to be called on a
84   // RegisteredTaskSource.
85   enum class RunStatus {
86     // TakeTask() cannot be called.
87     kDisallowed,
88     // TakeTask() may called, and the TaskSource has not reached its maximum
89     // concurrency (i.e. the TaskSource still needs to be queued).
90     kAllowedNotSaturated,
91     // TakeTask() may called, and the TaskSource has reached its maximum
92     // concurrency (i.e. the TaskSource no longer needs to be queued).
93     kAllowedSaturated,
94   };
95 
96   // A Transaction can perform multiple operations atomically on a
97   // TaskSource. While a Transaction is alive, it is guaranteed that nothing
98   // else will access the TaskSource; the TaskSource's lock is held for the
99   // lifetime of the Transaction.
100   class BASE_EXPORT Transaction {
101    public:
102     Transaction(Transaction&& other);
103     Transaction(const Transaction&) = delete;
104     Transaction& operator=(const Transaction&) = delete;
105     ~Transaction();
106 
107     operator bool() const { return !!task_source_; }
108 
109     // Sets TaskSource priority to |priority|.
110     void UpdatePriority(TaskPriority priority);
111 
112     // Returns the traits of all Tasks in the TaskSource.
traits()113     TaskTraits traits() const { return task_source_->traits_; }
114 
task_source()115     TaskSource* task_source() const { return task_source_; }
116 
117    protected:
118     explicit Transaction(TaskSource* task_source);
119 
120    private:
121     friend class TaskSource;
122 
123     TaskSource* task_source_;
124   };
125 
126   // |traits| is metadata that applies to all Tasks in the TaskSource.
127   // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
128   // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
129   // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
130   // execution mode of |task_runner|.
131   TaskSource(const TaskTraits& traits,
132              TaskRunner* task_runner,
133              TaskSourceExecutionMode execution_mode);
134   TaskSource(const TaskSource&) = delete;
135   TaskSource& operator=(const TaskSource&) = delete;
136 
137   // Begins a Transaction. This method cannot be called on a thread which has an
138   // active TaskSource::Transaction.
139   Transaction BeginTransaction() WARN_UNUSED_RESULT;
140 
141   virtual ExecutionEnvironment GetExecutionEnvironment() = 0;
142 
143   // Thread-safe but the returned value may immediately be obsolete. As such
144   // this should only be used as a best-effort guess of how many more workers
145   // are needed. This may be called on an empty task source.
146   virtual size_t GetRemainingConcurrency() const = 0;
147 
148   // Returns a TaskSourceSortKey representing the priority of the TaskSource.
149   virtual TaskSourceSortKey GetSortKey(bool disable_fair_scheduling) const = 0;
150 
151   // Support for IntrusiveHeap.
152   void SetHeapHandle(const HeapHandle& handle);
153   void ClearHeapHandle();
GetHeapHandle()154   HeapHandle GetHeapHandle() const { return heap_handle_; }
155 
heap_handle()156   HeapHandle heap_handle() const { return heap_handle_; }
157 
158   // Returns the shutdown behavior of all Tasks in the TaskSource. Can be
159   // accessed without a Transaction because it is never mutated.
shutdown_behavior()160   TaskShutdownBehavior shutdown_behavior() const {
161     return traits_.shutdown_behavior();
162   }
163   // Returns a racy priority of the TaskSource. Can be accessed without a
164   // Transaction but may return an outdated result.
priority_racy()165   TaskPriority priority_racy() const {
166     return priority_racy_.load(std::memory_order_relaxed);
167   }
168   // Returns the thread policy of the TaskSource. Can be accessed without a
169   // Transaction because it is never mutated.
thread_policy()170   ThreadPolicy thread_policy() const { return traits_.thread_policy(); }
171 
172   // A reference to TaskRunner is only retained between PushTask() and when
173   // DidProcessTask() returns false, guaranteeing it is safe to dereference this
174   // pointer. Otherwise, the caller should guarantee such TaskRunner still
175   // exists before dereferencing.
task_runner()176   TaskRunner* task_runner() const { return task_runner_; }
177 
execution_mode()178   TaskSourceExecutionMode execution_mode() const { return execution_mode_; }
179 
180  protected:
181   virtual ~TaskSource();
182 
183   virtual RunStatus WillRunTask() = 0;
184 
185   // Implementations of TakeTask(), DidProcessTask() and Clear() must ensure
186   // proper synchronization iff |transaction| is nullptr.
187   virtual Task TakeTask(TaskSource::Transaction* transaction) = 0;
188   virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0;
189 
190   // This may be called for each outstanding RegisteredTaskSource that's ready.
191   // The implementation needs to support this being called multiple times;
192   // unless it guarantees never to hand-out multiple RegisteredTaskSources that
193   // are concurrently ready.
194   virtual Task Clear(TaskSource::Transaction* transaction) = 0;
195 
196   // Sets TaskSource priority to |priority|.
197   void UpdatePriority(TaskPriority priority);
198 
199   // The TaskTraits of all Tasks in the TaskSource.
200   TaskTraits traits_;
201 
202   // The cached priority for atomic access.
203   std::atomic<TaskPriority> priority_racy_;
204 
205   // Synchronizes access to all members.
206   mutable CheckedLock lock_{UniversalPredecessor()};
207 
208  private:
209   friend class RefCountedThreadSafe<TaskSource>;
210   friend class RegisteredTaskSource;
211 
212   // The TaskSource's position in its current PriorityQueue. Access is protected
213   // by the PriorityQueue's lock.
214   HeapHandle heap_handle_;
215 
216   // A pointer to the TaskRunner that posts to this TaskSource, if any. The
217   // derived class is responsible for calling AddRef() when a TaskSource from
218   // which no Task is executing becomes non-empty and Release() when
219   // it becomes empty again (e.g. when DidProcessTask() returns false).
220   TaskRunner* task_runner_;
221 
222   TaskSourceExecutionMode execution_mode_;
223 };
224 
225 // Wrapper around TaskSource to signify the intent to queue and run it.
226 // RegisteredTaskSource can only be created with TaskTracker and may only be
227 // used by a single worker at a time. However, the same task source may be
228 // registered several times, spawning multiple RegisteredTaskSources. A
229 // RegisteredTaskSource resets to its initial state when WillRunTask() fails
230 // or after DidProcessTask(), so it can be used again.
231 class BASE_EXPORT RegisteredTaskSource {
232  public:
233   RegisteredTaskSource();
234   RegisteredTaskSource(std::nullptr_t);
235   RegisteredTaskSource(RegisteredTaskSource&& other) noexcept;
236   RegisteredTaskSource(const RegisteredTaskSource&) = delete;
237   RegisteredTaskSource& operator=(const RegisteredTaskSource&) = delete;
238   ~RegisteredTaskSource();
239 
240   RegisteredTaskSource& operator=(RegisteredTaskSource&& other);
241 
242   operator bool() const { return task_source_ != nullptr; }
243   TaskSource* operator->() const { return task_source_.get(); }
get()244   TaskSource* get() const { return task_source_.get(); }
245 
246   static RegisteredTaskSource CreateForTesting(
247       scoped_refptr<TaskSource> task_source,
248       TaskTracker* task_tracker = nullptr);
249 
250   // Can only be called if this RegisteredTaskSource is in its initial state.
251   // Returns the underlying task source. An Optional is used in preparation for
252   // the merge between ThreadPool and TaskQueueManager (in Blink).
253   // https://crbug.com/783309
254   scoped_refptr<TaskSource> Unregister();
255 
256   // Informs this TaskSource that the current worker would like to run a Task
257   // from it. Can only be called if in its initial state. Returns a RunStatus
258   // that indicates if the operation is allowed (TakeTask() can be called).
259   TaskSource::RunStatus WillRunTask();
260 
261   // Returns the next task to run from this TaskSource. This should be called
262   // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is
263   // optional and should only be provided if this operation is already part of
264   // a transaction.
265   Task TakeTask(TaskSource::Transaction* transaction = nullptr)
266       WARN_UNUSED_RESULT;
267 
268   // Must be called after WillRunTask() or once the task was run if TakeTask()
269   // was called. This resets this RegisteredTaskSource to its initial state so
270   // that WillRunTask() may be called again. |transaction| is optional and
271   // should only be provided if this operation is already part of a transaction.
272   // Returns true if the TaskSource should be queued after this operation.
273   bool DidProcessTask(TaskSource::Transaction* transaction = nullptr);
274 
275   // Returns a task that clears this TaskSource to make it empty. |transaction|
276   // is optional and should only be provided if this operation is already part
277   // of a transaction.
278   Task Clear(TaskSource::Transaction* transaction = nullptr) WARN_UNUSED_RESULT;
279 
280  private:
281   friend class TaskTracker;
282   RegisteredTaskSource(scoped_refptr<TaskSource> task_source,
283                        TaskTracker* task_tracker);
284 
285 #if DCHECK_IS_ON()
286   // Indicates the step of a task execution chain.
287   enum class State {
288     kInitial,       // WillRunTask() may be called.
289     kReady,         // After WillRunTask() returned a valid RunStatus.
290   };
291 
292   State run_step_ = State::kInitial;
293 #endif  // DCHECK_IS_ON()
294 
295   scoped_refptr<TaskSource> task_source_;
296   TaskTracker* task_tracker_ = nullptr;
297 };
298 
299 // A pair of Transaction and RegisteredTaskSource. Useful to carry a
300 // RegisteredTaskSource with an associated Transaction.
301 // TODO(crbug.com/839091): Rename to RegisteredTaskSourceAndTransaction.
302 struct BASE_EXPORT TransactionWithRegisteredTaskSource {
303  public:
304   TransactionWithRegisteredTaskSource(RegisteredTaskSource task_source_in,
305                                       TaskSource::Transaction transaction_in);
306 
307   TransactionWithRegisteredTaskSource(
308       TransactionWithRegisteredTaskSource&& other) = default;
309   TransactionWithRegisteredTaskSource(
310       const TransactionWithRegisteredTaskSource&) = delete;
311   TransactionWithRegisteredTaskSource& operator=(
312       const TransactionWithRegisteredTaskSource&) = delete;
313   ~TransactionWithRegisteredTaskSource() = default;
314 
315   static TransactionWithRegisteredTaskSource FromTaskSource(
316       RegisteredTaskSource task_source_in);
317 
318   RegisteredTaskSource task_source;
319   TaskSource::Transaction transaction;
320 };
321 
322 }  // namespace internal
323 }  // namespace base
324 
325 #endif  // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
326