1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <type_traits>
11 #include <utility>
12 
13 #include "base/atomicops.h"
14 #include "base/auto_reset.h"
15 #include "base/bind.h"
16 #include "base/callback_helpers.h"
17 #include "base/compiler_specific.h"
18 #include "base/containers/stack_container.h"
19 #include "base/feature_list.h"
20 #include "base/location.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/metrics/histogram.h"
23 #include "base/numerics/clamped_math.h"
24 #include "base/optional.h"
25 #include "base/ranges/algorithm.h"
26 #include "base/sequence_token.h"
27 #include "base/strings/string_util.h"
28 #include "base/strings/stringprintf.h"
29 #include "base/task/task_traits.h"
30 #include "base/task/thread_pool/task_tracker.h"
31 #include "base/threading/platform_thread.h"
32 #include "base/threading/scoped_blocking_call.h"
33 #include "base/threading/scoped_blocking_call_internal.h"
34 #include "base/threading/thread_checker.h"
35 #include "base/threading/thread_restrictions.h"
36 #include "base/time/time_override.h"
37 #include "build/build_config.h"
38 
39 #if defined(OS_WIN)
40 #include "base/win/scoped_com_initializer.h"
41 #include "base/win/scoped_windows_thread_environment.h"
42 #include "base/win/scoped_winrt_initializer.h"
43 #include "base/win/windows_version.h"
44 #endif  // defined(OS_WIN)
45 
46 namespace base {
47 namespace internal {
48 
49 namespace {
50 
51 constexpr char kNumTasksBeforeDetachHistogramPrefix[] =
52     "ThreadPool.NumTasksBeforeDetach.";
53 constexpr size_t kMaxNumberOfWorkers = 256;
54 
55 // In a background thread group:
56 // - Blocking calls take more time than in a foreground thread group.
57 // - We want to minimize impact on foreground work, not maximize execution
58 //   throughput.
59 // For these reasons, the timeout to increase the maximum number of concurrent
60 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
61 // infinite because execution throughput should not be reduced forever if a task
62 // blocks forever.
63 //
64 // TODO(fdoray): On platforms without background thread groups, blocking in a
65 // BEST_EFFORT task should:
66 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
67 //    to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
68 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
69 //    *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
70 //    be scheduled concurrently when we believe that a BEST_EFFORT task is
71 //    blocked forever.
72 // Currently, only 1. is true as the configuration is per thread group.
73 // TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
74 // BlockedWorkersPoll.
75 constexpr TimeDelta kForegroundMayBlockThreshold =
76     TimeDelta::FromMilliseconds(1000);
77 constexpr TimeDelta kForegroundBlockedWorkersPoll =
78     TimeDelta::FromMilliseconds(1200);
79 constexpr TimeDelta kBackgroundMayBlockThreshold = TimeDelta::FromSeconds(10);
80 constexpr TimeDelta kBackgroundBlockedWorkersPoll = TimeDelta::FromSeconds(12);
81 
82 // Only used in DCHECKs.
ContainsWorker(const std::vector<scoped_refptr<WorkerThread>> & workers,const WorkerThread * worker)83 bool ContainsWorker(const std::vector<scoped_refptr<WorkerThread>>& workers,
84                     const WorkerThread* worker) {
85   auto it =
86       ranges::find_if(workers, [worker](const scoped_refptr<WorkerThread>& i) {
87         return i.get() == worker;
88       });
89   return it != workers.end();
90 }
91 
92 }  // namespace
93 
94 // Upon destruction, executes actions that control the number of active workers.
95 // Useful to satisfy locking requirements of these actions.
96 class ThreadGroupImpl::ScopedCommandsExecutor
97     : public ThreadGroup::BaseScopedCommandsExecutor {
98  public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)99   explicit ScopedCommandsExecutor(ThreadGroupImpl* outer) : outer_(outer) {}
100 
101   ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
102   ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()103   ~ScopedCommandsExecutor() { FlushImpl(); }
104 
ScheduleWakeUp(scoped_refptr<WorkerThread> worker)105   void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
106     workers_to_wake_up_.AddWorker(std::move(worker));
107   }
108 
ScheduleStart(scoped_refptr<WorkerThread> worker)109   void ScheduleStart(scoped_refptr<WorkerThread> worker) {
110     workers_to_start_.AddWorker(std::move(worker));
111   }
112 
FlushWorkerCreation(CheckedLock * held_lock)113   void FlushWorkerCreation(CheckedLock* held_lock) {
114     if (workers_to_wake_up_.empty() && workers_to_start_.empty())
115       return;
116     CheckedAutoUnlock auto_unlock(*held_lock);
117     FlushImpl();
118     workers_to_wake_up_.clear();
119     workers_to_start_.clear();
120     must_schedule_adjust_max_tasks_ = false;
121   }
122 
ScheduleAdjustMaxTasks()123   void ScheduleAdjustMaxTasks() {
124     DCHECK(!must_schedule_adjust_max_tasks_);
125     must_schedule_adjust_max_tasks_ = true;
126   }
127 
ScheduleAddHistogramSample(HistogramBase * histogram,HistogramBase::Sample sample)128   void ScheduleAddHistogramSample(HistogramBase* histogram,
129                                   HistogramBase::Sample sample) {
130     scheduled_histogram_samples_->emplace_back(histogram, sample);
131   }
132 
133  private:
134   class WorkerContainer {
135    public:
136     WorkerContainer() = default;
137     WorkerContainer(const WorkerContainer&) = delete;
138     WorkerContainer& operator=(const WorkerContainer&) = delete;
139 
AddWorker(scoped_refptr<WorkerThread> worker)140     void AddWorker(scoped_refptr<WorkerThread> worker) {
141       if (!worker)
142         return;
143       if (!first_worker_)
144         first_worker_ = std::move(worker);
145       else
146         additional_workers_.push_back(std::move(worker));
147     }
148 
149     template <typename Action>
ForEachWorker(Action action)150     void ForEachWorker(Action action) {
151       if (first_worker_) {
152         action(first_worker_.get());
153         for (scoped_refptr<WorkerThread> worker : additional_workers_)
154           action(worker.get());
155       } else {
156         DCHECK(additional_workers_.empty());
157       }
158     }
159 
empty() const160     bool empty() const { return first_worker_ == nullptr; }
161 
clear()162     void clear() {
163       first_worker_.reset();
164       additional_workers_.clear();
165     }
166 
167    private:
168     // The purpose of |first_worker| is to avoid a heap allocation by the vector
169     // in the case where there is only one worker in the container.
170     scoped_refptr<WorkerThread> first_worker_;
171     std::vector<scoped_refptr<WorkerThread>> additional_workers_;
172   };
173 
FlushImpl()174   void FlushImpl() {
175     CheckedLock::AssertNoLockHeldOnCurrentThread();
176 
177     // Wake up workers.
178     workers_to_wake_up_.ForEachWorker(
179         [](WorkerThread* worker) { worker->WakeUp(); });
180 
181     // Start workers. Happens after wake ups to prevent the case where a worker
182     // enters its main function, is descheduled because it wasn't woken up yet,
183     // and is woken up immediately after.
184     workers_to_start_.ForEachWorker([&](WorkerThread* worker) {
185       worker->Start(outer_->after_start().worker_thread_observer);
186       if (outer_->worker_started_for_testing_)
187         outer_->worker_started_for_testing_->Wait();
188     });
189 
190     if (must_schedule_adjust_max_tasks_)
191       outer_->ScheduleAdjustMaxTasks();
192 
193     if (!scheduled_histogram_samples_->empty()) {
194       DCHECK_LE(scheduled_histogram_samples_->size(),
195                 kHistogramSampleStackSize);
196       for (auto& scheduled_sample : scheduled_histogram_samples_)
197         scheduled_sample.first->Add(scheduled_sample.second);
198       scheduled_histogram_samples_->clear();
199     }
200   }
201 
202   ThreadGroupImpl* const outer_;
203 
204   WorkerContainer workers_to_wake_up_;
205   WorkerContainer workers_to_start_;
206   bool must_schedule_adjust_max_tasks_ = false;
207 
208   // StackVector rather than std::vector avoid heap allocations; size should be
209   // high enough to store the maximum number of histogram samples added to a
210   // given ScopedCommandsExecutor instance.
211   static constexpr size_t kHistogramSampleStackSize = 5;
212   StackVector<std::pair<HistogramBase*, HistogramBase::Sample>,
213               kHistogramSampleStackSize>
214       scheduled_histogram_samples_;
215 };
216 
217 // static
218 constexpr size_t
219     ThreadGroupImpl::ScopedCommandsExecutor::kHistogramSampleStackSize;
220 
221 class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
222                                                   public BlockingObserver {
223  public:
224   // |outer| owns the worker for which this delegate is constructed.
225   explicit WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer);
226   WorkerThreadDelegateImpl(const WorkerThreadDelegateImpl&) = delete;
227   WorkerThreadDelegateImpl& operator=(const WorkerThreadDelegateImpl&) = delete;
228 
229   // OnMainExit() handles the thread-affine cleanup; WorkerThreadDelegateImpl
230   // can thereafter safely be deleted from any thread.
231   ~WorkerThreadDelegateImpl() override = default;
232 
233   // WorkerThread::Delegate:
234   WorkerThread::ThreadLabel GetThreadLabel() const override;
235   void OnMainEntry(const WorkerThread* worker) override;
236   RegisteredTaskSource GetWork(WorkerThread* worker) override;
237   void DidProcessTask(RegisteredTaskSource task_source) override;
238   TimeDelta GetSleepTimeout() override;
239   void OnMainExit(WorkerThread* worker) override;
240 
241   // BlockingObserver:
242   void BlockingStarted(BlockingType blocking_type) override;
243   void BlockingTypeUpgraded() override;
244   void BlockingEnded() override;
245 
246   // Returns true iff the worker can get work. Cleans up the worker or puts it
247   // on the idle stack if it can't get work.
248   bool CanGetWorkLockRequired(ScopedCommandsExecutor* executor,
249                               WorkerThread* worker)
250       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
251 
252   // Increments max [best effort] tasks iff this worker has been within a
253   // ScopedBlockingCall for more than |may_block_threshold|.
254   void MaybeIncrementMaxTasksLockRequired()
255       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
256 
current_task_priority_lock_required() const257   TaskPriority current_task_priority_lock_required() const
258       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
259     return *read_any().current_task_priority;
260   }
261 
262   // Exposed for AnnotateCheckedLockAcquired in
263   // ThreadGroupImpl::AdjustMaxTasks()
lock() const264   const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
265     return outer_->lock_;
266   }
267 
268  private:
269   // Returns true if |worker| is allowed to cleanup and remove itself from the
270   // thread group. Called from GetWork() when no work is available.
271   bool CanCleanupLockRequired(const WorkerThread* worker) const
272       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
273 
274   // Calls cleanup on |worker| and removes it from the thread group. Called from
275   // GetWork() when no work is available and CanCleanupLockRequired() returns
276   // true.
277   void CleanupLockRequired(ScopedCommandsExecutor* executor,
278                            WorkerThread* worker)
279       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
280 
281   // Called in GetWork() when a worker becomes idle.
282   void OnWorkerBecomesIdleLockRequired(WorkerThread* worker)
283       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
284 
285   // Accessed only from the worker thread.
286   struct WorkerOnly {
287     // Number of tasks executed since the last time the
288     // ThreadPool.NumTasksBeforeDetach histogram was recorded.
289     size_t num_tasks_since_last_detach = 0;
290 
291     // Whether the worker is currently running a task (i.e. GetWork() has
292     // returned a non-empty task source and DidProcessTask() hasn't been called
293     // yet).
294     bool is_running_task = false;
295 
296 #if defined(OS_WIN)
297     std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
298 #endif  // defined(OS_WIN)
299   } worker_only_;
300 
301   // Writes from the worker thread protected by |outer_->lock_|. Reads from any
302   // thread, protected by |outer_->lock_| when not on the worker thread.
303   struct WriteWorkerReadAny {
304     // The priority of the task the worker is currently running if any.
305     base::Optional<TaskPriority> current_task_priority;
306 
307     // Time when MayBlockScopeEntered() was last called. Reset when
308     // BlockingScopeExited() is called.
309     TimeTicks blocking_start_time;
310   } write_worker_read_any_;
311 
worker_only()312   WorkerOnly& worker_only() {
313     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
314     return worker_only_;
315   }
316 
write_worker()317   WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
318     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
319     return write_worker_read_any_;
320   }
321 
read_any() const322   const WriteWorkerReadAny& read_any() const
323       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
324     return write_worker_read_any_;
325   }
326 
read_worker() const327   const WriteWorkerReadAny& read_worker() const {
328     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
329     return write_worker_read_any_;
330   }
331 
332   const TrackedRef<ThreadGroupImpl> outer_;
333 
334   // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| was
335   // incremented due to a ScopedBlockingCall on the thread.
336   bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
337   bool incremented_max_best_effort_tasks_since_blocked_
338       GUARDED_BY(outer_->lock_) = false;
339 
340   // Verifies that specific calls are always made from the worker thread.
341   THREAD_CHECKER(worker_thread_checker_);
342 };
343 
ThreadGroupImpl(StringPiece histogram_label,StringPiece thread_group_label,ThreadPriority priority_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)344 ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label,
345                                  StringPiece thread_group_label,
346                                  ThreadPriority priority_hint,
347                                  TrackedRef<TaskTracker> task_tracker,
348                                  TrackedRef<Delegate> delegate)
349     : ThreadGroup(std::move(task_tracker), std::move(delegate)),
350       thread_group_label_(thread_group_label.as_string()),
351       priority_hint_(priority_hint),
352       idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
353       // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
354       // than 1000 tasks before detaching, there is no need to know the exact
355       // number of tasks that ran.
356       num_tasks_before_detach_histogram_(
357           histogram_label.empty()
358               ? nullptr
359               : Histogram::FactoryGet(
360                     JoinString(
361                         {kNumTasksBeforeDetachHistogramPrefix, histogram_label},
362                         ""),
363                     1,
364                     1000,
365                     50,
366                     HistogramBase::kUmaTargetedHistogramFlag)),
367       tracked_ref_factory_(this) {
368   DCHECK(!thread_group_label_.empty());
369 }
370 
Start(int max_tasks,int max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SequencedTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,Optional<TimeDelta> may_block_threshold)371 void ThreadGroupImpl::Start(
372     int max_tasks,
373     int max_best_effort_tasks,
374     TimeDelta suggested_reclaim_time,
375     scoped_refptr<SequencedTaskRunner> service_thread_task_runner,
376     WorkerThreadObserver* worker_thread_observer,
377     WorkerEnvironment worker_environment,
378     bool synchronous_thread_start_for_testing,
379     Optional<TimeDelta> may_block_threshold) {
380   ThreadGroup::Start();
381 
382   DCHECK(!replacement_thread_group_);
383 
384   in_start().wakeup_after_getwork = FeatureList::IsEnabled(kWakeUpAfterGetWork);
385   in_start().wakeup_strategy = kWakeUpStrategyParam.Get();
386   in_start().may_block_without_delay =
387       FeatureList::IsEnabled(kMayBlockWithoutDelay);
388   in_start().may_block_threshold =
389       may_block_threshold ? may_block_threshold.value()
390                           : (priority_hint_ == ThreadPriority::NORMAL
391                                  ? kForegroundMayBlockThreshold
392                                  : kBackgroundMayBlockThreshold);
393   in_start().blocked_workers_poll_period =
394       priority_hint_ == ThreadPriority::NORMAL ? kForegroundBlockedWorkersPoll
395                                                : kBackgroundBlockedWorkersPoll;
396 
397   ScopedCommandsExecutor executor(this);
398   CheckedAutoLock auto_lock(lock_);
399 
400   DCHECK(workers_.empty());
401   max_tasks_ = max_tasks;
402   DCHECK_GE(max_tasks_, 1U);
403   in_start().initial_max_tasks = max_tasks_;
404   DCHECK_LE(in_start().initial_max_tasks, kMaxNumberOfWorkers);
405   max_best_effort_tasks_ = max_best_effort_tasks;
406   in_start().suggested_reclaim_time = suggested_reclaim_time;
407   in_start().worker_environment = worker_environment;
408   in_start().service_thread_task_runner = std::move(service_thread_task_runner);
409   in_start().worker_thread_observer = worker_thread_observer;
410 
411 #if DCHECK_IS_ON()
412   in_start().initialized = true;
413 #endif
414 
415   if (synchronous_thread_start_for_testing) {
416     worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
417     // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
418     // WaitableEvent or it defeats the purpose of having threads start without
419     // externally visible side-effects.
420     worker_started_for_testing_->declare_only_used_while_idle();
421   }
422 
423   EnsureEnoughWorkersLockRequired(&executor);
424 }
425 
~ThreadGroupImpl()426 ThreadGroupImpl::~ThreadGroupImpl() {
427   // ThreadGroup should only ever be deleted:
428   //  1) In tests, after JoinForTesting().
429   //  2) In production, iff initialization failed.
430   // In both cases |workers_| should be empty.
431   DCHECK(workers_.empty());
432 }
433 
UpdateSortKey(TaskSource::Transaction transaction)434 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
435   ScopedCommandsExecutor executor(this);
436   UpdateSortKeyImpl(&executor, std::move(transaction));
437 }
438 
PushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source)439 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
440     TransactionWithRegisteredTaskSource transaction_with_task_source) {
441   ScopedCommandsExecutor executor(this);
442   PushTaskSourceAndWakeUpWorkersImpl(&executor,
443                                      std::move(transaction_with_task_source));
444 }
445 
GetMaxConcurrentNonBlockedTasksDeprecated() const446 size_t ThreadGroupImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
447 #if DCHECK_IS_ON()
448   CheckedAutoLock auto_lock(lock_);
449   DCHECK_NE(after_start().initial_max_tasks, 0U)
450       << "GetMaxConcurrentTasksDeprecated() should only be called after the "
451       << "thread group has started.";
452 #endif
453   return after_start().initial_max_tasks;
454 }
455 
WaitForWorkersIdleForTesting(size_t n)456 void ThreadGroupImpl::WaitForWorkersIdleForTesting(size_t n) {
457   CheckedAutoLock auto_lock(lock_);
458 
459 #if DCHECK_IS_ON()
460   DCHECK(!some_workers_cleaned_up_for_testing_)
461       << "Workers detached prior to waiting for a specific number of idle "
462          "workers. Doing the wait under such conditions is flaky. Consider "
463          "setting the suggested reclaim time to TimeDelta::Max() in Start().";
464 #endif
465 
466   WaitForWorkersIdleLockRequiredForTesting(n);
467 }
468 
WaitForAllWorkersIdleForTesting()469 void ThreadGroupImpl::WaitForAllWorkersIdleForTesting() {
470   CheckedAutoLock auto_lock(lock_);
471   WaitForWorkersIdleLockRequiredForTesting(workers_.size());
472 }
473 
WaitForWorkersCleanedUpForTesting(size_t n)474 void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
475   CheckedAutoLock auto_lock(lock_);
476 
477   if (!num_workers_cleaned_up_for_testing_cv_)
478     num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();
479 
480   while (num_workers_cleaned_up_for_testing_ < n)
481     num_workers_cleaned_up_for_testing_cv_->Wait();
482 
483   num_workers_cleaned_up_for_testing_ = 0;
484 }
485 
JoinForTesting()486 void ThreadGroupImpl::JoinForTesting() {
487   decltype(workers_) workers_copy;
488   {
489     CheckedAutoLock auto_lock(lock_);
490     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
491 
492     DCHECK_GT(workers_.size(), size_t(0))
493         << "Joined an unstarted thread group.";
494 
495     join_for_testing_started_ = true;
496 
497     // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
498     // being joined.
499     worker_cleanup_disallowed_for_testing_ = true;
500 
501     // Make a copy of the WorkerThreads so that we can call
502     // WorkerThread::JoinForTesting() without holding |lock_| since
503     // WorkerThreads may need to access |workers_|.
504     workers_copy = workers_;
505   }
506   for (const auto& worker : workers_copy)
507     worker->JoinForTesting();
508 
509   CheckedAutoLock auto_lock(lock_);
510   DCHECK(workers_ == workers_copy);
511   // Release |workers_| to clear their TrackedRef against |this|.
512   workers_.clear();
513 }
514 
NumberOfWorkersForTesting() const515 size_t ThreadGroupImpl::NumberOfWorkersForTesting() const {
516   CheckedAutoLock auto_lock(lock_);
517   return workers_.size();
518 }
519 
GetMaxTasksForTesting() const520 size_t ThreadGroupImpl::GetMaxTasksForTesting() const {
521   CheckedAutoLock auto_lock(lock_);
522   return max_tasks_;
523 }
524 
GetMaxBestEffortTasksForTesting() const525 size_t ThreadGroupImpl::GetMaxBestEffortTasksForTesting() const {
526   CheckedAutoLock auto_lock(lock_);
527   return max_best_effort_tasks_;
528 }
529 
NumberOfIdleWorkersForTesting() const530 size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
531   CheckedAutoLock auto_lock(lock_);
532   return idle_workers_stack_.Size();
533 }
534 
WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer)535 ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl(
536     TrackedRef<ThreadGroupImpl> outer)
537     : outer_(std::move(outer)) {
538   // Bound in OnMainEntry().
539   DETACH_FROM_THREAD(worker_thread_checker_);
540 }
541 
542 WorkerThread::ThreadLabel
GetThreadLabel() const543 ThreadGroupImpl::WorkerThreadDelegateImpl::GetThreadLabel() const {
544   return WorkerThread::ThreadLabel::POOLED;
545 }
546 
OnMainEntry(const WorkerThread * worker)547 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainEntry(
548     const WorkerThread* worker) {
549   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
550 
551   {
552 #if DCHECK_IS_ON()
553     CheckedAutoLock auto_lock(outer_->lock_);
554     DCHECK(ContainsWorker(outer_->workers_, worker));
555 #endif
556   }
557 
558 #if defined(OS_WIN)
559   worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
560       outer_->after_start().worker_environment);
561 #endif  // defined(OS_WIN)
562 
563   PlatformThread::SetName(
564       StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
565 
566   outer_->BindToCurrentThread();
567   SetBlockingObserverForCurrentThread(this);
568 
569   if (outer_->worker_started_for_testing_) {
570     // When |worker_started_for_testing_| is set, the thread that starts workers
571     // should wait for a worker to have started before starting the next one,
572     // and there should only be one thread that wakes up workers at a time.
573     DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
574     outer_->worker_started_for_testing_->Signal();
575   }
576 }
577 
GetWork(WorkerThread * worker)578 RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
579     WorkerThread* worker) {
580   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
581   DCHECK(!worker_only().is_running_task);
582 
583   ScopedCommandsExecutor executor(outer_.get());
584   CheckedAutoLock auto_lock(outer_->lock_);
585 
586   DCHECK(ContainsWorker(outer_->workers_, worker));
587 
588   // Use this opportunity, before assigning work to this worker, to create/wake
589   // additional workers if needed (doing this here allows us to reduce
590   // potentially expensive create/wake directly on PostTask()).
591   if (!outer_->after_start().wakeup_after_getwork &&
592       outer_->after_start().wakeup_strategy !=
593           WakeUpStrategy::kCentralizedWakeUps) {
594     outer_->EnsureEnoughWorkersLockRequired(&executor);
595     executor.FlushWorkerCreation(&outer_->lock_);
596   }
597 
598   if (!CanGetWorkLockRequired(&executor, worker))
599     return nullptr;
600 
601   RegisteredTaskSource task_source;
602   TaskPriority priority;
603   while (!task_source && !outer_->priority_queue_.IsEmpty()) {
604     // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
605     // BEST_EFFORT tasks run concurrently.
606     priority = outer_->priority_queue_.PeekSortKey().priority();
607     if (!outer_->task_tracker_->CanRunPriority(priority) ||
608         (priority == TaskPriority::BEST_EFFORT &&
609          outer_->num_running_best_effort_tasks_ >=
610              outer_->max_best_effort_tasks_)) {
611       break;
612     }
613 
614     task_source = outer_->TakeRegisteredTaskSource(&executor);
615   }
616   if (!task_source) {
617     OnWorkerBecomesIdleLockRequired(worker);
618     return nullptr;
619   }
620 
621   // Running task bookkeeping.
622   worker_only().is_running_task = true;
623   outer_->IncrementTasksRunningLockRequired(priority);
624   DCHECK(!outer_->idle_workers_stack_.Contains(worker));
625   write_worker().current_task_priority = priority;
626 
627   if (outer_->after_start().wakeup_after_getwork &&
628       outer_->after_start().wakeup_strategy !=
629           WakeUpStrategy::kCentralizedWakeUps) {
630     outer_->EnsureEnoughWorkersLockRequired(&executor);
631   }
632 
633   return task_source;
634 }
635 
DidProcessTask(RegisteredTaskSource task_source)636 void ThreadGroupImpl::WorkerThreadDelegateImpl::DidProcessTask(
637     RegisteredTaskSource task_source) {
638   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
639   DCHECK(worker_only().is_running_task);
640   DCHECK(read_worker().blocking_start_time.is_null());
641 
642   ++worker_only().num_tasks_since_last_detach;
643 
644   // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
645   // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
646   // prior to acquiring a second lock
647   Optional<TransactionWithRegisteredTaskSource> transaction_with_task_source;
648   if (task_source) {
649     transaction_with_task_source.emplace(
650         TransactionWithRegisteredTaskSource::FromTaskSource(
651             std::move(task_source)));
652   }
653 
654   ScopedCommandsExecutor workers_executor(outer_.get());
655   ScopedReenqueueExecutor reenqueue_executor;
656   CheckedAutoLock auto_lock(outer_->lock_);
657 
658   DCHECK(!incremented_max_tasks_since_blocked_);
659   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
660 
661   // Running task bookkeeping.
662   outer_->DecrementTasksRunningLockRequired(
663       *read_worker().current_task_priority);
664   worker_only().is_running_task = false;
665 
666   if (transaction_with_task_source) {
667     outer_->ReEnqueueTaskSourceLockRequired(
668         &workers_executor, &reenqueue_executor,
669         std::move(transaction_with_task_source.value()));
670   }
671 }
672 
GetSleepTimeout()673 TimeDelta ThreadGroupImpl::WorkerThreadDelegateImpl::GetSleepTimeout() {
674   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
675   // Sleep for an extra 10% to avoid the following pathological case:
676 
677   //   0) A task is running on a timer which matches
678   //      |after_start().suggested_reclaim_time|.
679   //   1) The timer fires and this worker is created by
680   //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
681   //      worker was assigned the task.
682   //   2) This worker begins sleeping |after_start().suggested_reclaim_time| (on
683   //      top of the idle stack).
684   //   3) The task assigned to the other worker completes and the worker goes
685   //      back on the idle stack (this worker is now second on the idle stack;
686   //      its GetLastUsedTime() is set to Now()).
687   //   4) The sleep in (2) expires. Since (3) was fast this worker is likely to
688   //      have been second on the idle stack long enough for
689   //      CanCleanupLockRequired() to be satisfied in which case this worker is
690   //      cleaned up.
691   //   5) The timer fires at roughly the same time and we're back to (1) if (4)
692   //      resulted in a clean up; causing thread churn.
693   //
694   //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
695   //   before (5). In that case (5) will cause (3) and refresh this worker's
696   //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
697   //   and avoiding churn.
698   //
699   //   Of course the same problem arises if in (0) the timer matches
700   //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
701   //   timer slower than |after_start().suggested_reclaim_time| will cause such
702   //   churn during long idle periods. If this is a problem in practice, the
703   //   standby thread configuration and algorithm should be revisited.
704   return outer_->after_start().suggested_reclaim_time * 1.1;
705 }
706 
CanCleanupLockRequired(const WorkerThread * worker) const707 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired(
708     const WorkerThread* worker) const {
709   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
710 
711   const TimeTicks last_used_time = worker->GetLastUsedTime();
712   return !last_used_time.is_null() &&
713          subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
714              outer_->after_start().suggested_reclaim_time &&
715          (outer_->workers_.size() > outer_->after_start().initial_max_tasks ||
716           !FeatureList::IsEnabled(kNoDetachBelowInitialCapacity)) &&
717          LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
718 }
719 
CleanupLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)720 void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired(
721     ScopedCommandsExecutor* executor,
722     WorkerThread* worker) {
723   DCHECK(!outer_->join_for_testing_started_);
724   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
725 
726   if (outer_->num_tasks_before_detach_histogram_) {
727     executor->ScheduleAddHistogramSample(
728         outer_->num_tasks_before_detach_histogram_,
729         worker_only().num_tasks_since_last_detach);
730   }
731   worker->Cleanup();
732   outer_->idle_workers_stack_.Remove(worker);
733 
734   // Remove the worker from |workers_|.
735   auto worker_iter = ranges::find(outer_->workers_, worker);
736   DCHECK(worker_iter != outer_->workers_.end());
737   outer_->workers_.erase(worker_iter);
738 }
739 
OnWorkerBecomesIdleLockRequired(WorkerThread * worker)740 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnWorkerBecomesIdleLockRequired(
741     WorkerThread* worker) {
742   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
743 
744   // Add the worker to the idle stack.
745   DCHECK(!outer_->idle_workers_stack_.Contains(worker));
746   outer_->idle_workers_stack_.Push(worker);
747   DCHECK_LE(outer_->idle_workers_stack_.Size(), outer_->workers_.size());
748   outer_->idle_workers_stack_cv_for_testing_->Broadcast();
749 }
750 
OnMainExit(WorkerThread * worker)751 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit(
752     WorkerThread* worker) {
753   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
754 
755 #if DCHECK_IS_ON()
756   {
757     bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
758     CheckedAutoLock auto_lock(outer_->lock_);
759 
760     // |worker| should already have been removed from the idle workers stack and
761     // |workers_| by the time the thread is about to exit. (except in the cases
762     // where the thread group is no longer going to be used - in which case,
763     // it's fine for there to be invalid workers in the thread group.
764     if (!shutdown_complete && !outer_->join_for_testing_started_) {
765       DCHECK(!outer_->idle_workers_stack_.Contains(worker));
766       DCHECK(!ContainsWorker(outer_->workers_, worker));
767     }
768   }
769 #endif
770 
771 #if defined(OS_WIN)
772   worker_only().win_thread_environment.reset();
773 #endif  // defined(OS_WIN)
774 
775   // Count cleaned up workers for tests. It's important to do this here instead
776   // of at the end of CleanupLockRequired() because some side-effects of
777   // cleaning up happen outside the lock (e.g. recording histograms) and
778   // resuming from tests must happen-after that point or checks on the main
779   // thread will be flaky (crbug.com/1047733).
780   CheckedAutoLock auto_lock(outer_->lock_);
781   ++outer_->num_workers_cleaned_up_for_testing_;
782 #if DCHECK_IS_ON()
783   outer_->some_workers_cleaned_up_for_testing_ = true;
784 #endif
785   if (outer_->num_workers_cleaned_up_for_testing_cv_)
786     outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
787 }
788 
BlockingStarted(BlockingType blocking_type)789 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingStarted(
790     BlockingType blocking_type) {
791   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
792   DCHECK(worker_only().is_running_task);
793 
794   // MayBlock with no delay reuses WillBlock implementation.
795   // WillBlock is always used when time overrides is active. crbug.com/1038867
796   if (outer_->after_start().may_block_without_delay ||
797       base::subtle::ScopedTimeClockOverrides::overrides_active()) {
798     blocking_type = BlockingType::WILL_BLOCK;
799   }
800 
801   ScopedCommandsExecutor executor(outer_.get());
802   CheckedAutoLock auto_lock(outer_->lock_);
803 
804   DCHECK(!incremented_max_tasks_since_blocked_);
805   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
806   DCHECK(read_worker().blocking_start_time.is_null());
807   write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
808 
809   if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT)
810     ++outer_->num_unresolved_best_effort_may_block_;
811 
812   if (blocking_type == BlockingType::WILL_BLOCK) {
813     incremented_max_tasks_since_blocked_ = true;
814     outer_->IncrementMaxTasksLockRequired();
815     outer_->EnsureEnoughWorkersLockRequired(&executor);
816   } else {
817     ++outer_->num_unresolved_may_block_;
818   }
819 
820   outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
821 }
822 
BlockingTypeUpgraded()823 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
824   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
825   DCHECK(worker_only().is_running_task);
826 
827   // The blocking type always being WILL_BLOCK in this experiment and with time
828   // overrides, it should never be considered "upgraded".
829   if (outer_->after_start().may_block_without_delay ||
830       base::subtle::ScopedTimeClockOverrides::overrides_active()) {
831     return;
832   }
833 
834   ScopedCommandsExecutor executor(outer_.get());
835   CheckedAutoLock auto_lock(outer_->lock_);
836 
837   // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
838   // same scope already caused the max tasks to be incremented.
839   if (incremented_max_tasks_since_blocked_)
840     return;
841 
842   // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
843   // same scope.
844   --outer_->num_unresolved_may_block_;
845 
846   incremented_max_tasks_since_blocked_ = true;
847   outer_->IncrementMaxTasksLockRequired();
848   outer_->EnsureEnoughWorkersLockRequired(&executor);
849 }
850 
BlockingEnded()851 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
852   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
853   DCHECK(worker_only().is_running_task);
854 
855   CheckedAutoLock auto_lock(outer_->lock_);
856   DCHECK(!read_worker().blocking_start_time.is_null());
857   if (incremented_max_tasks_since_blocked_)
858     outer_->DecrementMaxTasksLockRequired();
859   else
860     --outer_->num_unresolved_may_block_;
861 
862   if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
863     if (incremented_max_best_effort_tasks_since_blocked_)
864       outer_->DecrementMaxBestEffortTasksLockRequired();
865     else
866       --outer_->num_unresolved_best_effort_may_block_;
867   }
868 
869   incremented_max_tasks_since_blocked_ = false;
870   incremented_max_best_effort_tasks_since_blocked_ = false;
871   write_worker().blocking_start_time = TimeTicks();
872 }
873 
CanGetWorkLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)874 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
875     ScopedCommandsExecutor* executor,
876     WorkerThread* worker) {
877   // To avoid searching through the idle stack : use GetLastUsedTime() not being
878   // null (or being directly on top of the idle stack) as a proxy for being on
879   // the idle stack.
880   const bool is_on_idle_workers_stack =
881       outer_->idle_workers_stack_.Peek() == worker ||
882       !worker->GetLastUsedTime().is_null();
883   DCHECK_EQ(is_on_idle_workers_stack,
884             outer_->idle_workers_stack_.Contains(worker));
885 
886   if (is_on_idle_workers_stack) {
887     if (CanCleanupLockRequired(worker))
888       CleanupLockRequired(executor, worker);
889     return false;
890   }
891 
892   // Excess workers should not get work, until they are no longer excess (i.e.
893   // max tasks increases). This ensures that if we have excess workers in the
894   // thread group, they get a chance to no longer be excess before being cleaned
895   // up.
896   if (outer_->GetNumAwakeWorkersLockRequired() >
897       outer_->GetDesiredNumAwakeWorkersLockRequired()) {
898     OnWorkerBecomesIdleLockRequired(worker);
899     return false;
900   }
901 
902   return true;
903 }
904 
905 void ThreadGroupImpl::WorkerThreadDelegateImpl::
MaybeIncrementMaxTasksLockRequired()906     MaybeIncrementMaxTasksLockRequired() {
907   if (read_any().blocking_start_time.is_null() ||
908       subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
909           outer_->after_start().may_block_threshold) {
910     return;
911   }
912 
913   if (!incremented_max_tasks_since_blocked_) {
914     incremented_max_tasks_since_blocked_ = true;
915     --outer_->num_unresolved_may_block_;
916     outer_->IncrementMaxTasksLockRequired();
917   }
918   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
919       !incremented_max_best_effort_tasks_since_blocked_) {
920     incremented_max_best_effort_tasks_since_blocked_ = true;
921     --outer_->num_unresolved_best_effort_may_block_;
922     outer_->IncrementMaxBestEffortTasksLockRequired();
923   }
924 }
925 
WaitForWorkersIdleLockRequiredForTesting(size_t n)926 void ThreadGroupImpl::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
927   // Make sure workers do not cleanup while watching the idle count.
928   AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
929 
930   while (idle_workers_stack_.Size() < n)
931     idle_workers_stack_cv_for_testing_->Wait();
932 }
933 
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)934 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
935     ScopedCommandsExecutor* executor) {
936   if (workers_.size() == kMaxNumberOfWorkers)
937     return;
938   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
939 
940   if (!idle_workers_stack_.IsEmpty())
941     return;
942 
943   if (workers_.size() >= max_tasks_)
944     return;
945 
946   scoped_refptr<WorkerThread> new_worker =
947       CreateAndRegisterWorkerLockRequired(executor);
948   DCHECK(new_worker);
949   idle_workers_stack_.Push(new_worker.get());
950 }
951 
952 scoped_refptr<WorkerThread>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)953 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
954     ScopedCommandsExecutor* executor) {
955   DCHECK(!join_for_testing_started_);
956   DCHECK_LT(workers_.size(), max_tasks_);
957   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
958   DCHECK(idle_workers_stack_.IsEmpty());
959 
960   // WorkerThread needs |lock_| as a predecessor for its thread lock
961   // because in WakeUpOneWorker, |lock_| is first acquired and then
962   // the thread lock is acquired when WakeUp is called on the worker.
963   scoped_refptr<WorkerThread> worker =
964       MakeRefCounted<WorkerThread>(priority_hint_,
965                                    std::make_unique<WorkerThreadDelegateImpl>(
966                                        tracked_ref_factory_.GetTrackedRef()),
967                                    task_tracker_, &lock_);
968 
969   workers_.push_back(worker);
970   executor->ScheduleStart(worker);
971   DCHECK_LE(workers_.size(), max_tasks_);
972 
973   return worker;
974 }
975 
GetNumAwakeWorkersLockRequired() const976 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
977   DCHECK_GE(workers_.size(), idle_workers_stack_.Size());
978   size_t num_awake_workers = workers_.size() - idle_workers_stack_.Size();
979   DCHECK_GE(num_awake_workers, num_running_tasks_);
980   return num_awake_workers;
981 }
982 
GetDesiredNumAwakeWorkersLockRequired() const983 size_t ThreadGroupImpl::GetDesiredNumAwakeWorkersLockRequired() const {
984   // Number of BEST_EFFORT task sources that are running or queued and allowed
985   // to run by the CanRunPolicy.
986   const size_t num_running_or_queued_can_run_best_effort_task_sources =
987       num_running_best_effort_tasks_ +
988       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
989 
990   const size_t workers_for_best_effort_task_sources =
991       std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
992                         max_best_effort_tasks_),
993                num_running_best_effort_tasks_);
994 
995   // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
996   const size_t num_running_or_queued_foreground_task_sources =
997       (num_running_tasks_ - num_running_best_effort_tasks_) +
998       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
999 
1000   const size_t workers_for_foreground_task_sources =
1001       num_running_or_queued_foreground_task_sources;
1002 
1003   return std::min({workers_for_best_effort_task_sources +
1004                        workers_for_foreground_task_sources,
1005                    max_tasks_, kMaxNumberOfWorkers});
1006 }
1007 
DidUpdateCanRunPolicy()1008 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
1009   ScopedCommandsExecutor executor(this);
1010   CheckedAutoLock auto_lock(lock_);
1011   EnsureEnoughWorkersLockRequired(&executor);
1012 }
1013 
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)1014 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
1015     BaseScopedCommandsExecutor* base_executor) {
1016   // Don't do anything if the thread group isn't started.
1017   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
1018     return;
1019 
1020   ScopedCommandsExecutor* executor =
1021       static_cast<ScopedCommandsExecutor*>(base_executor);
1022 
1023   const size_t desired_num_awake_workers =
1024       GetDesiredNumAwakeWorkersLockRequired();
1025   const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
1026 
1027   size_t num_workers_to_wake_up =
1028       ClampSub(desired_num_awake_workers, num_awake_workers);
1029   if (after_start().wakeup_strategy == WakeUpStrategy::kExponentialWakeUps) {
1030     num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
1031   } else if (after_start().wakeup_strategy ==
1032              WakeUpStrategy::kSerializedWakeUps) {
1033     num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(1U));
1034   }
1035 
1036   // Wake up the appropriate number of workers.
1037   for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
1038     MaintainAtLeastOneIdleWorkerLockRequired(executor);
1039     WorkerThread* worker_to_wakeup = idle_workers_stack_.Pop();
1040     DCHECK(worker_to_wakeup);
1041     executor->ScheduleWakeUp(worker_to_wakeup);
1042   }
1043 
1044   // In the case where the loop above didn't wake up any worker and we don't
1045   // have excess workers, the idle worker should be maintained. This happens
1046   // when called from the last worker awake, or a recent increase in |max_tasks|
1047   // now makes it possible to keep an idle worker.
1048   if (desired_num_awake_workers == num_awake_workers)
1049     MaintainAtLeastOneIdleWorkerLockRequired(executor);
1050 
1051   // This function is called every time a task source is (re-)enqueued,
1052   // hence the minimum priority needs to be updated.
1053   UpdateMinAllowedPriorityLockRequired();
1054 
1055   // Ensure that the number of workers is periodically adjusted if needed.
1056   MaybeScheduleAdjustMaxTasksLockRequired(executor);
1057 }
1058 
AdjustMaxTasks()1059 void ThreadGroupImpl::AdjustMaxTasks() {
1060   DCHECK(
1061       after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
1062 
1063   ScopedCommandsExecutor executor(this);
1064   CheckedAutoLock auto_lock(lock_);
1065   DCHECK(adjust_max_tasks_posted_);
1066   adjust_max_tasks_posted_ = false;
1067 
1068   // Increment max tasks for each worker that has been within a MAY_BLOCK
1069   // ScopedBlockingCall for more than may_block_threshold.
1070   for (scoped_refptr<WorkerThread> worker : workers_) {
1071     // The delegates of workers inside a ThreadGroupImpl should be
1072     // WorkerThreadDelegateImpls.
1073     WorkerThreadDelegateImpl* delegate =
1074         static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
1075     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
1076     delegate->MaybeIncrementMaxTasksLockRequired();
1077   }
1078 
1079   // Wake up workers according to the updated |max_tasks_|. This will also
1080   // reschedule AdjustMaxTasks() if necessary.
1081   EnsureEnoughWorkersLockRequired(&executor);
1082 }
1083 
ScheduleAdjustMaxTasks()1084 void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
1085   // |adjust_max_tasks_posted_| can't change before the task posted below runs.
1086   // Skip check on NaCl to avoid unsafe reference acquisition warning.
1087 #if !defined(OS_NACL)
1088   DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
1089 #endif
1090 
1091   after_start().service_thread_task_runner->PostDelayedTask(
1092       FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
1093       after_start().blocked_workers_poll_period);
1094 }
1095 
MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor * executor)1096 void ThreadGroupImpl::MaybeScheduleAdjustMaxTasksLockRequired(
1097     ScopedCommandsExecutor* executor) {
1098   if (!adjust_max_tasks_posted_ &&
1099       ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
1100     executor->ScheduleAdjustMaxTasks();
1101     adjust_max_tasks_posted_ = true;
1102   }
1103 }
1104 
ShouldPeriodicallyAdjustMaxTasksLockRequired()1105 bool ThreadGroupImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
1106   // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
1107   // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
1108   // enough to accommodate all queued and running task sources and an idle
1109   // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
1110   // - When (1) is false: No worker would be created or woken up if the
1111   //   concurrency limits were increased, so there is no hurry to increase them.
1112   // - When (2) is false: The concurrency limits could not be increased by
1113   //   AdjustMaxTasks().
1114 
1115   const size_t num_running_or_queued_best_effort_task_sources =
1116       num_running_best_effort_tasks_ +
1117       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
1118   if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
1119       num_unresolved_best_effort_may_block_ > 0) {
1120     return true;
1121   }
1122 
1123   const size_t num_running_or_queued_task_sources =
1124       num_running_tasks_ +
1125       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
1126       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
1127   constexpr size_t kIdleWorker = 1;
1128   return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
1129          num_unresolved_may_block_ > 0;
1130 }
1131 
UpdateMinAllowedPriorityLockRequired()1132 void ThreadGroupImpl::UpdateMinAllowedPriorityLockRequired() {
1133   if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
1134     max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
1135   } else {
1136     max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
1137                                  priority_queue_.PeekSortKey().worker_count()},
1138                                 std::memory_order_relaxed);
1139   }
1140 }
1141 
DecrementTasksRunningLockRequired(TaskPriority priority)1142 void ThreadGroupImpl::DecrementTasksRunningLockRequired(TaskPriority priority) {
1143   DCHECK_GT(num_running_tasks_, 0U);
1144   --num_running_tasks_;
1145   if (priority == TaskPriority::BEST_EFFORT) {
1146     DCHECK_GT(num_running_best_effort_tasks_, 0U);
1147     --num_running_best_effort_tasks_;
1148   }
1149   UpdateMinAllowedPriorityLockRequired();
1150 }
1151 
IncrementTasksRunningLockRequired(TaskPriority priority)1152 void ThreadGroupImpl::IncrementTasksRunningLockRequired(TaskPriority priority) {
1153   ++num_running_tasks_;
1154   DCHECK_LE(num_running_tasks_, max_tasks_);
1155   DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
1156   if (priority == TaskPriority::BEST_EFFORT) {
1157     ++num_running_best_effort_tasks_;
1158     DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
1159     DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
1160   }
1161   UpdateMinAllowedPriorityLockRequired();
1162 }
1163 
DecrementMaxTasksLockRequired()1164 void ThreadGroupImpl::DecrementMaxTasksLockRequired() {
1165   DCHECK_GT(num_running_tasks_, 0U);
1166   DCHECK_GT(max_tasks_, 0U);
1167   --max_tasks_;
1168   UpdateMinAllowedPriorityLockRequired();
1169 }
1170 
IncrementMaxTasksLockRequired()1171 void ThreadGroupImpl::IncrementMaxTasksLockRequired() {
1172   DCHECK_GT(num_running_tasks_, 0U);
1173   ++max_tasks_;
1174   UpdateMinAllowedPriorityLockRequired();
1175 }
1176 
DecrementMaxBestEffortTasksLockRequired()1177 void ThreadGroupImpl::DecrementMaxBestEffortTasksLockRequired() {
1178   DCHECK_GT(num_running_tasks_, 0U);
1179   DCHECK_GT(max_best_effort_tasks_, 0U);
1180   --max_best_effort_tasks_;
1181   UpdateMinAllowedPriorityLockRequired();
1182 }
1183 
IncrementMaxBestEffortTasksLockRequired()1184 void ThreadGroupImpl::IncrementMaxBestEffortTasksLockRequired() {
1185   DCHECK_GT(num_running_tasks_, 0U);
1186   ++max_best_effort_tasks_;
1187   UpdateMinAllowedPriorityLockRequired();
1188 }
1189 
1190 ThreadGroupImpl::InitializedInStart::InitializedInStart() = default;
1191 ThreadGroupImpl::InitializedInStart::~InitializedInStart() = default;
1192 
1193 }  // namespace internal
1194 }  // namespace base
1195