1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_impl.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <type_traits>
11 #include <utility>
12
13 #include "base/atomicops.h"
14 #include "base/auto_reset.h"
15 #include "base/bind.h"
16 #include "base/callback_helpers.h"
17 #include "base/compiler_specific.h"
18 #include "base/containers/stack_container.h"
19 #include "base/feature_list.h"
20 #include "base/location.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/metrics/histogram.h"
23 #include "base/numerics/clamped_math.h"
24 #include "base/optional.h"
25 #include "base/ranges/algorithm.h"
26 #include "base/sequence_token.h"
27 #include "base/strings/string_util.h"
28 #include "base/strings/stringprintf.h"
29 #include "base/task/task_traits.h"
30 #include "base/task/thread_pool/task_tracker.h"
31 #include "base/threading/platform_thread.h"
32 #include "base/threading/scoped_blocking_call.h"
33 #include "base/threading/scoped_blocking_call_internal.h"
34 #include "base/threading/thread_checker.h"
35 #include "base/threading/thread_restrictions.h"
36 #include "base/time/time_override.h"
37 #include "build/build_config.h"
38
39 #if defined(OS_WIN)
40 #include "base/win/scoped_com_initializer.h"
41 #include "base/win/scoped_windows_thread_environment.h"
42 #include "base/win/scoped_winrt_initializer.h"
43 #include "base/win/windows_version.h"
44 #endif // defined(OS_WIN)
45
46 namespace base {
47 namespace internal {
48
49 namespace {
50
51 constexpr char kNumTasksBeforeDetachHistogramPrefix[] =
52 "ThreadPool.NumTasksBeforeDetach.";
53 constexpr size_t kMaxNumberOfWorkers = 256;
54
55 // In a background thread group:
56 // - Blocking calls take more time than in a foreground thread group.
57 // - We want to minimize impact on foreground work, not maximize execution
58 // throughput.
59 // For these reasons, the timeout to increase the maximum number of concurrent
60 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
61 // infinite because execution throughput should not be reduced forever if a task
62 // blocks forever.
63 //
64 // TODO(fdoray): On platforms without background thread groups, blocking in a
65 // BEST_EFFORT task should:
66 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
67 // to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
68 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
69 // *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
70 // be scheduled concurrently when we believe that a BEST_EFFORT task is
71 // blocked forever.
72 // Currently, only 1. is true as the configuration is per thread group.
73 // TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
74 // BlockedWorkersPoll.
75 constexpr TimeDelta kForegroundMayBlockThreshold =
76 TimeDelta::FromMilliseconds(1000);
77 constexpr TimeDelta kForegroundBlockedWorkersPoll =
78 TimeDelta::FromMilliseconds(1200);
79 constexpr TimeDelta kBackgroundMayBlockThreshold = TimeDelta::FromSeconds(10);
80 constexpr TimeDelta kBackgroundBlockedWorkersPoll = TimeDelta::FromSeconds(12);
81
82 // Only used in DCHECKs.
ContainsWorker(const std::vector<scoped_refptr<WorkerThread>> & workers,const WorkerThread * worker)83 bool ContainsWorker(const std::vector<scoped_refptr<WorkerThread>>& workers,
84 const WorkerThread* worker) {
85 auto it =
86 ranges::find_if(workers, [worker](const scoped_refptr<WorkerThread>& i) {
87 return i.get() == worker;
88 });
89 return it != workers.end();
90 }
91
92 } // namespace
93
94 // Upon destruction, executes actions that control the number of active workers.
95 // Useful to satisfy locking requirements of these actions.
96 class ThreadGroupImpl::ScopedCommandsExecutor
97 : public ThreadGroup::BaseScopedCommandsExecutor {
98 public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)99 explicit ScopedCommandsExecutor(ThreadGroupImpl* outer) : outer_(outer) {}
100
101 ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
102 ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()103 ~ScopedCommandsExecutor() { FlushImpl(); }
104
ScheduleWakeUp(scoped_refptr<WorkerThread> worker)105 void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
106 workers_to_wake_up_.AddWorker(std::move(worker));
107 }
108
ScheduleStart(scoped_refptr<WorkerThread> worker)109 void ScheduleStart(scoped_refptr<WorkerThread> worker) {
110 workers_to_start_.AddWorker(std::move(worker));
111 }
112
FlushWorkerCreation(CheckedLock * held_lock)113 void FlushWorkerCreation(CheckedLock* held_lock) {
114 if (workers_to_wake_up_.empty() && workers_to_start_.empty())
115 return;
116 CheckedAutoUnlock auto_unlock(*held_lock);
117 FlushImpl();
118 workers_to_wake_up_.clear();
119 workers_to_start_.clear();
120 must_schedule_adjust_max_tasks_ = false;
121 }
122
ScheduleAdjustMaxTasks()123 void ScheduleAdjustMaxTasks() {
124 DCHECK(!must_schedule_adjust_max_tasks_);
125 must_schedule_adjust_max_tasks_ = true;
126 }
127
ScheduleAddHistogramSample(HistogramBase * histogram,HistogramBase::Sample sample)128 void ScheduleAddHistogramSample(HistogramBase* histogram,
129 HistogramBase::Sample sample) {
130 scheduled_histogram_samples_->emplace_back(histogram, sample);
131 }
132
133 private:
134 class WorkerContainer {
135 public:
136 WorkerContainer() = default;
137 WorkerContainer(const WorkerContainer&) = delete;
138 WorkerContainer& operator=(const WorkerContainer&) = delete;
139
AddWorker(scoped_refptr<WorkerThread> worker)140 void AddWorker(scoped_refptr<WorkerThread> worker) {
141 if (!worker)
142 return;
143 if (!first_worker_)
144 first_worker_ = std::move(worker);
145 else
146 additional_workers_.push_back(std::move(worker));
147 }
148
149 template <typename Action>
ForEachWorker(Action action)150 void ForEachWorker(Action action) {
151 if (first_worker_) {
152 action(first_worker_.get());
153 for (scoped_refptr<WorkerThread> worker : additional_workers_)
154 action(worker.get());
155 } else {
156 DCHECK(additional_workers_.empty());
157 }
158 }
159
empty() const160 bool empty() const { return first_worker_ == nullptr; }
161
clear()162 void clear() {
163 first_worker_.reset();
164 additional_workers_.clear();
165 }
166
167 private:
168 // The purpose of |first_worker| is to avoid a heap allocation by the vector
169 // in the case where there is only one worker in the container.
170 scoped_refptr<WorkerThread> first_worker_;
171 std::vector<scoped_refptr<WorkerThread>> additional_workers_;
172 };
173
FlushImpl()174 void FlushImpl() {
175 CheckedLock::AssertNoLockHeldOnCurrentThread();
176
177 // Wake up workers.
178 workers_to_wake_up_.ForEachWorker(
179 [](WorkerThread* worker) { worker->WakeUp(); });
180
181 // Start workers. Happens after wake ups to prevent the case where a worker
182 // enters its main function, is descheduled because it wasn't woken up yet,
183 // and is woken up immediately after.
184 workers_to_start_.ForEachWorker([&](WorkerThread* worker) {
185 worker->Start(outer_->after_start().worker_thread_observer);
186 if (outer_->worker_started_for_testing_)
187 outer_->worker_started_for_testing_->Wait();
188 });
189
190 if (must_schedule_adjust_max_tasks_)
191 outer_->ScheduleAdjustMaxTasks();
192
193 if (!scheduled_histogram_samples_->empty()) {
194 DCHECK_LE(scheduled_histogram_samples_->size(),
195 kHistogramSampleStackSize);
196 for (auto& scheduled_sample : scheduled_histogram_samples_)
197 scheduled_sample.first->Add(scheduled_sample.second);
198 scheduled_histogram_samples_->clear();
199 }
200 }
201
202 ThreadGroupImpl* const outer_;
203
204 WorkerContainer workers_to_wake_up_;
205 WorkerContainer workers_to_start_;
206 bool must_schedule_adjust_max_tasks_ = false;
207
208 // StackVector rather than std::vector avoid heap allocations; size should be
209 // high enough to store the maximum number of histogram samples added to a
210 // given ScopedCommandsExecutor instance.
211 static constexpr size_t kHistogramSampleStackSize = 5;
212 StackVector<std::pair<HistogramBase*, HistogramBase::Sample>,
213 kHistogramSampleStackSize>
214 scheduled_histogram_samples_;
215 };
216
217 // static
218 constexpr size_t
219 ThreadGroupImpl::ScopedCommandsExecutor::kHistogramSampleStackSize;
220
221 class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
222 public BlockingObserver {
223 public:
224 // |outer| owns the worker for which this delegate is constructed.
225 explicit WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer);
226 WorkerThreadDelegateImpl(const WorkerThreadDelegateImpl&) = delete;
227 WorkerThreadDelegateImpl& operator=(const WorkerThreadDelegateImpl&) = delete;
228
229 // OnMainExit() handles the thread-affine cleanup; WorkerThreadDelegateImpl
230 // can thereafter safely be deleted from any thread.
231 ~WorkerThreadDelegateImpl() override = default;
232
233 // WorkerThread::Delegate:
234 WorkerThread::ThreadLabel GetThreadLabel() const override;
235 void OnMainEntry(const WorkerThread* worker) override;
236 RegisteredTaskSource GetWork(WorkerThread* worker) override;
237 void DidProcessTask(RegisteredTaskSource task_source) override;
238 TimeDelta GetSleepTimeout() override;
239 void OnMainExit(WorkerThread* worker) override;
240
241 // BlockingObserver:
242 void BlockingStarted(BlockingType blocking_type) override;
243 void BlockingTypeUpgraded() override;
244 void BlockingEnded() override;
245
246 // Returns true iff the worker can get work. Cleans up the worker or puts it
247 // on the idle stack if it can't get work.
248 bool CanGetWorkLockRequired(ScopedCommandsExecutor* executor,
249 WorkerThread* worker)
250 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
251
252 // Increments max [best effort] tasks iff this worker has been within a
253 // ScopedBlockingCall for more than |may_block_threshold|.
254 void MaybeIncrementMaxTasksLockRequired()
255 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
256
current_task_priority_lock_required() const257 TaskPriority current_task_priority_lock_required() const
258 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
259 return *read_any().current_task_priority;
260 }
261
262 // Exposed for AnnotateCheckedLockAcquired in
263 // ThreadGroupImpl::AdjustMaxTasks()
lock() const264 const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
265 return outer_->lock_;
266 }
267
268 private:
269 // Returns true if |worker| is allowed to cleanup and remove itself from the
270 // thread group. Called from GetWork() when no work is available.
271 bool CanCleanupLockRequired(const WorkerThread* worker) const
272 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
273
274 // Calls cleanup on |worker| and removes it from the thread group. Called from
275 // GetWork() when no work is available and CanCleanupLockRequired() returns
276 // true.
277 void CleanupLockRequired(ScopedCommandsExecutor* executor,
278 WorkerThread* worker)
279 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
280
281 // Called in GetWork() when a worker becomes idle.
282 void OnWorkerBecomesIdleLockRequired(WorkerThread* worker)
283 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
284
285 // Accessed only from the worker thread.
286 struct WorkerOnly {
287 // Number of tasks executed since the last time the
288 // ThreadPool.NumTasksBeforeDetach histogram was recorded.
289 size_t num_tasks_since_last_detach = 0;
290
291 // Whether the worker is currently running a task (i.e. GetWork() has
292 // returned a non-empty task source and DidProcessTask() hasn't been called
293 // yet).
294 bool is_running_task = false;
295
296 #if defined(OS_WIN)
297 std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
298 #endif // defined(OS_WIN)
299 } worker_only_;
300
301 // Writes from the worker thread protected by |outer_->lock_|. Reads from any
302 // thread, protected by |outer_->lock_| when not on the worker thread.
303 struct WriteWorkerReadAny {
304 // The priority of the task the worker is currently running if any.
305 base::Optional<TaskPriority> current_task_priority;
306
307 // Time when MayBlockScopeEntered() was last called. Reset when
308 // BlockingScopeExited() is called.
309 TimeTicks blocking_start_time;
310 } write_worker_read_any_;
311
worker_only()312 WorkerOnly& worker_only() {
313 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
314 return worker_only_;
315 }
316
write_worker()317 WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
318 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
319 return write_worker_read_any_;
320 }
321
read_any() const322 const WriteWorkerReadAny& read_any() const
323 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
324 return write_worker_read_any_;
325 }
326
read_worker() const327 const WriteWorkerReadAny& read_worker() const {
328 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
329 return write_worker_read_any_;
330 }
331
332 const TrackedRef<ThreadGroupImpl> outer_;
333
334 // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| was
335 // incremented due to a ScopedBlockingCall on the thread.
336 bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
337 bool incremented_max_best_effort_tasks_since_blocked_
338 GUARDED_BY(outer_->lock_) = false;
339
340 // Verifies that specific calls are always made from the worker thread.
341 THREAD_CHECKER(worker_thread_checker_);
342 };
343
ThreadGroupImpl(StringPiece histogram_label,StringPiece thread_group_label,ThreadPriority priority_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)344 ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label,
345 StringPiece thread_group_label,
346 ThreadPriority priority_hint,
347 TrackedRef<TaskTracker> task_tracker,
348 TrackedRef<Delegate> delegate)
349 : ThreadGroup(std::move(task_tracker), std::move(delegate)),
350 thread_group_label_(thread_group_label.as_string()),
351 priority_hint_(priority_hint),
352 idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
353 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
354 // than 1000 tasks before detaching, there is no need to know the exact
355 // number of tasks that ran.
356 num_tasks_before_detach_histogram_(
357 histogram_label.empty()
358 ? nullptr
359 : Histogram::FactoryGet(
360 JoinString(
361 {kNumTasksBeforeDetachHistogramPrefix, histogram_label},
362 ""),
363 1,
364 1000,
365 50,
366 HistogramBase::kUmaTargetedHistogramFlag)),
367 tracked_ref_factory_(this) {
368 DCHECK(!thread_group_label_.empty());
369 }
370
Start(int max_tasks,int max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SequencedTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,Optional<TimeDelta> may_block_threshold)371 void ThreadGroupImpl::Start(
372 int max_tasks,
373 int max_best_effort_tasks,
374 TimeDelta suggested_reclaim_time,
375 scoped_refptr<SequencedTaskRunner> service_thread_task_runner,
376 WorkerThreadObserver* worker_thread_observer,
377 WorkerEnvironment worker_environment,
378 bool synchronous_thread_start_for_testing,
379 Optional<TimeDelta> may_block_threshold) {
380 ThreadGroup::Start();
381
382 DCHECK(!replacement_thread_group_);
383
384 in_start().wakeup_after_getwork = FeatureList::IsEnabled(kWakeUpAfterGetWork);
385 in_start().wakeup_strategy = kWakeUpStrategyParam.Get();
386 in_start().may_block_without_delay =
387 FeatureList::IsEnabled(kMayBlockWithoutDelay);
388 in_start().may_block_threshold =
389 may_block_threshold ? may_block_threshold.value()
390 : (priority_hint_ == ThreadPriority::NORMAL
391 ? kForegroundMayBlockThreshold
392 : kBackgroundMayBlockThreshold);
393 in_start().blocked_workers_poll_period =
394 priority_hint_ == ThreadPriority::NORMAL ? kForegroundBlockedWorkersPoll
395 : kBackgroundBlockedWorkersPoll;
396
397 ScopedCommandsExecutor executor(this);
398 CheckedAutoLock auto_lock(lock_);
399
400 DCHECK(workers_.empty());
401 max_tasks_ = max_tasks;
402 DCHECK_GE(max_tasks_, 1U);
403 in_start().initial_max_tasks = max_tasks_;
404 DCHECK_LE(in_start().initial_max_tasks, kMaxNumberOfWorkers);
405 max_best_effort_tasks_ = max_best_effort_tasks;
406 in_start().suggested_reclaim_time = suggested_reclaim_time;
407 in_start().worker_environment = worker_environment;
408 in_start().service_thread_task_runner = std::move(service_thread_task_runner);
409 in_start().worker_thread_observer = worker_thread_observer;
410
411 #if DCHECK_IS_ON()
412 in_start().initialized = true;
413 #endif
414
415 if (synchronous_thread_start_for_testing) {
416 worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
417 // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
418 // WaitableEvent or it defeats the purpose of having threads start without
419 // externally visible side-effects.
420 worker_started_for_testing_->declare_only_used_while_idle();
421 }
422
423 EnsureEnoughWorkersLockRequired(&executor);
424 }
425
~ThreadGroupImpl()426 ThreadGroupImpl::~ThreadGroupImpl() {
427 // ThreadGroup should only ever be deleted:
428 // 1) In tests, after JoinForTesting().
429 // 2) In production, iff initialization failed.
430 // In both cases |workers_| should be empty.
431 DCHECK(workers_.empty());
432 }
433
UpdateSortKey(TaskSource::Transaction transaction)434 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
435 ScopedCommandsExecutor executor(this);
436 UpdateSortKeyImpl(&executor, std::move(transaction));
437 }
438
PushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source)439 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
440 TransactionWithRegisteredTaskSource transaction_with_task_source) {
441 ScopedCommandsExecutor executor(this);
442 PushTaskSourceAndWakeUpWorkersImpl(&executor,
443 std::move(transaction_with_task_source));
444 }
445
GetMaxConcurrentNonBlockedTasksDeprecated() const446 size_t ThreadGroupImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
447 #if DCHECK_IS_ON()
448 CheckedAutoLock auto_lock(lock_);
449 DCHECK_NE(after_start().initial_max_tasks, 0U)
450 << "GetMaxConcurrentTasksDeprecated() should only be called after the "
451 << "thread group has started.";
452 #endif
453 return after_start().initial_max_tasks;
454 }
455
WaitForWorkersIdleForTesting(size_t n)456 void ThreadGroupImpl::WaitForWorkersIdleForTesting(size_t n) {
457 CheckedAutoLock auto_lock(lock_);
458
459 #if DCHECK_IS_ON()
460 DCHECK(!some_workers_cleaned_up_for_testing_)
461 << "Workers detached prior to waiting for a specific number of idle "
462 "workers. Doing the wait under such conditions is flaky. Consider "
463 "setting the suggested reclaim time to TimeDelta::Max() in Start().";
464 #endif
465
466 WaitForWorkersIdleLockRequiredForTesting(n);
467 }
468
WaitForAllWorkersIdleForTesting()469 void ThreadGroupImpl::WaitForAllWorkersIdleForTesting() {
470 CheckedAutoLock auto_lock(lock_);
471 WaitForWorkersIdleLockRequiredForTesting(workers_.size());
472 }
473
WaitForWorkersCleanedUpForTesting(size_t n)474 void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
475 CheckedAutoLock auto_lock(lock_);
476
477 if (!num_workers_cleaned_up_for_testing_cv_)
478 num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();
479
480 while (num_workers_cleaned_up_for_testing_ < n)
481 num_workers_cleaned_up_for_testing_cv_->Wait();
482
483 num_workers_cleaned_up_for_testing_ = 0;
484 }
485
JoinForTesting()486 void ThreadGroupImpl::JoinForTesting() {
487 decltype(workers_) workers_copy;
488 {
489 CheckedAutoLock auto_lock(lock_);
490 priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
491
492 DCHECK_GT(workers_.size(), size_t(0))
493 << "Joined an unstarted thread group.";
494
495 join_for_testing_started_ = true;
496
497 // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
498 // being joined.
499 worker_cleanup_disallowed_for_testing_ = true;
500
501 // Make a copy of the WorkerThreads so that we can call
502 // WorkerThread::JoinForTesting() without holding |lock_| since
503 // WorkerThreads may need to access |workers_|.
504 workers_copy = workers_;
505 }
506 for (const auto& worker : workers_copy)
507 worker->JoinForTesting();
508
509 CheckedAutoLock auto_lock(lock_);
510 DCHECK(workers_ == workers_copy);
511 // Release |workers_| to clear their TrackedRef against |this|.
512 workers_.clear();
513 }
514
NumberOfWorkersForTesting() const515 size_t ThreadGroupImpl::NumberOfWorkersForTesting() const {
516 CheckedAutoLock auto_lock(lock_);
517 return workers_.size();
518 }
519
GetMaxTasksForTesting() const520 size_t ThreadGroupImpl::GetMaxTasksForTesting() const {
521 CheckedAutoLock auto_lock(lock_);
522 return max_tasks_;
523 }
524
GetMaxBestEffortTasksForTesting() const525 size_t ThreadGroupImpl::GetMaxBestEffortTasksForTesting() const {
526 CheckedAutoLock auto_lock(lock_);
527 return max_best_effort_tasks_;
528 }
529
NumberOfIdleWorkersForTesting() const530 size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
531 CheckedAutoLock auto_lock(lock_);
532 return idle_workers_stack_.Size();
533 }
534
WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer)535 ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl(
536 TrackedRef<ThreadGroupImpl> outer)
537 : outer_(std::move(outer)) {
538 // Bound in OnMainEntry().
539 DETACH_FROM_THREAD(worker_thread_checker_);
540 }
541
542 WorkerThread::ThreadLabel
GetThreadLabel() const543 ThreadGroupImpl::WorkerThreadDelegateImpl::GetThreadLabel() const {
544 return WorkerThread::ThreadLabel::POOLED;
545 }
546
OnMainEntry(const WorkerThread * worker)547 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainEntry(
548 const WorkerThread* worker) {
549 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
550
551 {
552 #if DCHECK_IS_ON()
553 CheckedAutoLock auto_lock(outer_->lock_);
554 DCHECK(ContainsWorker(outer_->workers_, worker));
555 #endif
556 }
557
558 #if defined(OS_WIN)
559 worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
560 outer_->after_start().worker_environment);
561 #endif // defined(OS_WIN)
562
563 PlatformThread::SetName(
564 StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
565
566 outer_->BindToCurrentThread();
567 SetBlockingObserverForCurrentThread(this);
568
569 if (outer_->worker_started_for_testing_) {
570 // When |worker_started_for_testing_| is set, the thread that starts workers
571 // should wait for a worker to have started before starting the next one,
572 // and there should only be one thread that wakes up workers at a time.
573 DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
574 outer_->worker_started_for_testing_->Signal();
575 }
576 }
577
GetWork(WorkerThread * worker)578 RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
579 WorkerThread* worker) {
580 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
581 DCHECK(!worker_only().is_running_task);
582
583 ScopedCommandsExecutor executor(outer_.get());
584 CheckedAutoLock auto_lock(outer_->lock_);
585
586 DCHECK(ContainsWorker(outer_->workers_, worker));
587
588 // Use this opportunity, before assigning work to this worker, to create/wake
589 // additional workers if needed (doing this here allows us to reduce
590 // potentially expensive create/wake directly on PostTask()).
591 if (!outer_->after_start().wakeup_after_getwork &&
592 outer_->after_start().wakeup_strategy !=
593 WakeUpStrategy::kCentralizedWakeUps) {
594 outer_->EnsureEnoughWorkersLockRequired(&executor);
595 executor.FlushWorkerCreation(&outer_->lock_);
596 }
597
598 if (!CanGetWorkLockRequired(&executor, worker))
599 return nullptr;
600
601 RegisteredTaskSource task_source;
602 TaskPriority priority;
603 while (!task_source && !outer_->priority_queue_.IsEmpty()) {
604 // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
605 // BEST_EFFORT tasks run concurrently.
606 priority = outer_->priority_queue_.PeekSortKey().priority();
607 if (!outer_->task_tracker_->CanRunPriority(priority) ||
608 (priority == TaskPriority::BEST_EFFORT &&
609 outer_->num_running_best_effort_tasks_ >=
610 outer_->max_best_effort_tasks_)) {
611 break;
612 }
613
614 task_source = outer_->TakeRegisteredTaskSource(&executor);
615 }
616 if (!task_source) {
617 OnWorkerBecomesIdleLockRequired(worker);
618 return nullptr;
619 }
620
621 // Running task bookkeeping.
622 worker_only().is_running_task = true;
623 outer_->IncrementTasksRunningLockRequired(priority);
624 DCHECK(!outer_->idle_workers_stack_.Contains(worker));
625 write_worker().current_task_priority = priority;
626
627 if (outer_->after_start().wakeup_after_getwork &&
628 outer_->after_start().wakeup_strategy !=
629 WakeUpStrategy::kCentralizedWakeUps) {
630 outer_->EnsureEnoughWorkersLockRequired(&executor);
631 }
632
633 return task_source;
634 }
635
DidProcessTask(RegisteredTaskSource task_source)636 void ThreadGroupImpl::WorkerThreadDelegateImpl::DidProcessTask(
637 RegisteredTaskSource task_source) {
638 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
639 DCHECK(worker_only().is_running_task);
640 DCHECK(read_worker().blocking_start_time.is_null());
641
642 ++worker_only().num_tasks_since_last_detach;
643
644 // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
645 // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
646 // prior to acquiring a second lock
647 Optional<TransactionWithRegisteredTaskSource> transaction_with_task_source;
648 if (task_source) {
649 transaction_with_task_source.emplace(
650 TransactionWithRegisteredTaskSource::FromTaskSource(
651 std::move(task_source)));
652 }
653
654 ScopedCommandsExecutor workers_executor(outer_.get());
655 ScopedReenqueueExecutor reenqueue_executor;
656 CheckedAutoLock auto_lock(outer_->lock_);
657
658 DCHECK(!incremented_max_tasks_since_blocked_);
659 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
660
661 // Running task bookkeeping.
662 outer_->DecrementTasksRunningLockRequired(
663 *read_worker().current_task_priority);
664 worker_only().is_running_task = false;
665
666 if (transaction_with_task_source) {
667 outer_->ReEnqueueTaskSourceLockRequired(
668 &workers_executor, &reenqueue_executor,
669 std::move(transaction_with_task_source.value()));
670 }
671 }
672
GetSleepTimeout()673 TimeDelta ThreadGroupImpl::WorkerThreadDelegateImpl::GetSleepTimeout() {
674 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
675 // Sleep for an extra 10% to avoid the following pathological case:
676
677 // 0) A task is running on a timer which matches
678 // |after_start().suggested_reclaim_time|.
679 // 1) The timer fires and this worker is created by
680 // MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
681 // worker was assigned the task.
682 // 2) This worker begins sleeping |after_start().suggested_reclaim_time| (on
683 // top of the idle stack).
684 // 3) The task assigned to the other worker completes and the worker goes
685 // back on the idle stack (this worker is now second on the idle stack;
686 // its GetLastUsedTime() is set to Now()).
687 // 4) The sleep in (2) expires. Since (3) was fast this worker is likely to
688 // have been second on the idle stack long enough for
689 // CanCleanupLockRequired() to be satisfied in which case this worker is
690 // cleaned up.
691 // 5) The timer fires at roughly the same time and we're back to (1) if (4)
692 // resulted in a clean up; causing thread churn.
693 //
694 // Sleeping 10% longer in (2) makes it much less likely that (4) occurs
695 // before (5). In that case (5) will cause (3) and refresh this worker's
696 // GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
697 // and avoiding churn.
698 //
699 // Of course the same problem arises if in (0) the timer matches
700 // |after_start().suggested_reclaim_time * 1.1| but it's expected that any
701 // timer slower than |after_start().suggested_reclaim_time| will cause such
702 // churn during long idle periods. If this is a problem in practice, the
703 // standby thread configuration and algorithm should be revisited.
704 return outer_->after_start().suggested_reclaim_time * 1.1;
705 }
706
CanCleanupLockRequired(const WorkerThread * worker) const707 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired(
708 const WorkerThread* worker) const {
709 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
710
711 const TimeTicks last_used_time = worker->GetLastUsedTime();
712 return !last_used_time.is_null() &&
713 subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
714 outer_->after_start().suggested_reclaim_time &&
715 (outer_->workers_.size() > outer_->after_start().initial_max_tasks ||
716 !FeatureList::IsEnabled(kNoDetachBelowInitialCapacity)) &&
717 LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
718 }
719
CleanupLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)720 void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired(
721 ScopedCommandsExecutor* executor,
722 WorkerThread* worker) {
723 DCHECK(!outer_->join_for_testing_started_);
724 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
725
726 if (outer_->num_tasks_before_detach_histogram_) {
727 executor->ScheduleAddHistogramSample(
728 outer_->num_tasks_before_detach_histogram_,
729 worker_only().num_tasks_since_last_detach);
730 }
731 worker->Cleanup();
732 outer_->idle_workers_stack_.Remove(worker);
733
734 // Remove the worker from |workers_|.
735 auto worker_iter = ranges::find(outer_->workers_, worker);
736 DCHECK(worker_iter != outer_->workers_.end());
737 outer_->workers_.erase(worker_iter);
738 }
739
OnWorkerBecomesIdleLockRequired(WorkerThread * worker)740 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnWorkerBecomesIdleLockRequired(
741 WorkerThread* worker) {
742 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
743
744 // Add the worker to the idle stack.
745 DCHECK(!outer_->idle_workers_stack_.Contains(worker));
746 outer_->idle_workers_stack_.Push(worker);
747 DCHECK_LE(outer_->idle_workers_stack_.Size(), outer_->workers_.size());
748 outer_->idle_workers_stack_cv_for_testing_->Broadcast();
749 }
750
OnMainExit(WorkerThread * worker)751 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit(
752 WorkerThread* worker) {
753 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
754
755 #if DCHECK_IS_ON()
756 {
757 bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
758 CheckedAutoLock auto_lock(outer_->lock_);
759
760 // |worker| should already have been removed from the idle workers stack and
761 // |workers_| by the time the thread is about to exit. (except in the cases
762 // where the thread group is no longer going to be used - in which case,
763 // it's fine for there to be invalid workers in the thread group.
764 if (!shutdown_complete && !outer_->join_for_testing_started_) {
765 DCHECK(!outer_->idle_workers_stack_.Contains(worker));
766 DCHECK(!ContainsWorker(outer_->workers_, worker));
767 }
768 }
769 #endif
770
771 #if defined(OS_WIN)
772 worker_only().win_thread_environment.reset();
773 #endif // defined(OS_WIN)
774
775 // Count cleaned up workers for tests. It's important to do this here instead
776 // of at the end of CleanupLockRequired() because some side-effects of
777 // cleaning up happen outside the lock (e.g. recording histograms) and
778 // resuming from tests must happen-after that point or checks on the main
779 // thread will be flaky (crbug.com/1047733).
780 CheckedAutoLock auto_lock(outer_->lock_);
781 ++outer_->num_workers_cleaned_up_for_testing_;
782 #if DCHECK_IS_ON()
783 outer_->some_workers_cleaned_up_for_testing_ = true;
784 #endif
785 if (outer_->num_workers_cleaned_up_for_testing_cv_)
786 outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
787 }
788
BlockingStarted(BlockingType blocking_type)789 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingStarted(
790 BlockingType blocking_type) {
791 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
792 DCHECK(worker_only().is_running_task);
793
794 // MayBlock with no delay reuses WillBlock implementation.
795 // WillBlock is always used when time overrides is active. crbug.com/1038867
796 if (outer_->after_start().may_block_without_delay ||
797 base::subtle::ScopedTimeClockOverrides::overrides_active()) {
798 blocking_type = BlockingType::WILL_BLOCK;
799 }
800
801 ScopedCommandsExecutor executor(outer_.get());
802 CheckedAutoLock auto_lock(outer_->lock_);
803
804 DCHECK(!incremented_max_tasks_since_blocked_);
805 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
806 DCHECK(read_worker().blocking_start_time.is_null());
807 write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
808
809 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT)
810 ++outer_->num_unresolved_best_effort_may_block_;
811
812 if (blocking_type == BlockingType::WILL_BLOCK) {
813 incremented_max_tasks_since_blocked_ = true;
814 outer_->IncrementMaxTasksLockRequired();
815 outer_->EnsureEnoughWorkersLockRequired(&executor);
816 } else {
817 ++outer_->num_unresolved_may_block_;
818 }
819
820 outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
821 }
822
BlockingTypeUpgraded()823 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
824 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
825 DCHECK(worker_only().is_running_task);
826
827 // The blocking type always being WILL_BLOCK in this experiment and with time
828 // overrides, it should never be considered "upgraded".
829 if (outer_->after_start().may_block_without_delay ||
830 base::subtle::ScopedTimeClockOverrides::overrides_active()) {
831 return;
832 }
833
834 ScopedCommandsExecutor executor(outer_.get());
835 CheckedAutoLock auto_lock(outer_->lock_);
836
837 // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
838 // same scope already caused the max tasks to be incremented.
839 if (incremented_max_tasks_since_blocked_)
840 return;
841
842 // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
843 // same scope.
844 --outer_->num_unresolved_may_block_;
845
846 incremented_max_tasks_since_blocked_ = true;
847 outer_->IncrementMaxTasksLockRequired();
848 outer_->EnsureEnoughWorkersLockRequired(&executor);
849 }
850
BlockingEnded()851 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
852 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
853 DCHECK(worker_only().is_running_task);
854
855 CheckedAutoLock auto_lock(outer_->lock_);
856 DCHECK(!read_worker().blocking_start_time.is_null());
857 if (incremented_max_tasks_since_blocked_)
858 outer_->DecrementMaxTasksLockRequired();
859 else
860 --outer_->num_unresolved_may_block_;
861
862 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
863 if (incremented_max_best_effort_tasks_since_blocked_)
864 outer_->DecrementMaxBestEffortTasksLockRequired();
865 else
866 --outer_->num_unresolved_best_effort_may_block_;
867 }
868
869 incremented_max_tasks_since_blocked_ = false;
870 incremented_max_best_effort_tasks_since_blocked_ = false;
871 write_worker().blocking_start_time = TimeTicks();
872 }
873
CanGetWorkLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)874 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
875 ScopedCommandsExecutor* executor,
876 WorkerThread* worker) {
877 // To avoid searching through the idle stack : use GetLastUsedTime() not being
878 // null (or being directly on top of the idle stack) as a proxy for being on
879 // the idle stack.
880 const bool is_on_idle_workers_stack =
881 outer_->idle_workers_stack_.Peek() == worker ||
882 !worker->GetLastUsedTime().is_null();
883 DCHECK_EQ(is_on_idle_workers_stack,
884 outer_->idle_workers_stack_.Contains(worker));
885
886 if (is_on_idle_workers_stack) {
887 if (CanCleanupLockRequired(worker))
888 CleanupLockRequired(executor, worker);
889 return false;
890 }
891
892 // Excess workers should not get work, until they are no longer excess (i.e.
893 // max tasks increases). This ensures that if we have excess workers in the
894 // thread group, they get a chance to no longer be excess before being cleaned
895 // up.
896 if (outer_->GetNumAwakeWorkersLockRequired() >
897 outer_->GetDesiredNumAwakeWorkersLockRequired()) {
898 OnWorkerBecomesIdleLockRequired(worker);
899 return false;
900 }
901
902 return true;
903 }
904
905 void ThreadGroupImpl::WorkerThreadDelegateImpl::
MaybeIncrementMaxTasksLockRequired()906 MaybeIncrementMaxTasksLockRequired() {
907 if (read_any().blocking_start_time.is_null() ||
908 subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
909 outer_->after_start().may_block_threshold) {
910 return;
911 }
912
913 if (!incremented_max_tasks_since_blocked_) {
914 incremented_max_tasks_since_blocked_ = true;
915 --outer_->num_unresolved_may_block_;
916 outer_->IncrementMaxTasksLockRequired();
917 }
918 if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
919 !incremented_max_best_effort_tasks_since_blocked_) {
920 incremented_max_best_effort_tasks_since_blocked_ = true;
921 --outer_->num_unresolved_best_effort_may_block_;
922 outer_->IncrementMaxBestEffortTasksLockRequired();
923 }
924 }
925
WaitForWorkersIdleLockRequiredForTesting(size_t n)926 void ThreadGroupImpl::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
927 // Make sure workers do not cleanup while watching the idle count.
928 AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
929
930 while (idle_workers_stack_.Size() < n)
931 idle_workers_stack_cv_for_testing_->Wait();
932 }
933
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)934 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
935 ScopedCommandsExecutor* executor) {
936 if (workers_.size() == kMaxNumberOfWorkers)
937 return;
938 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
939
940 if (!idle_workers_stack_.IsEmpty())
941 return;
942
943 if (workers_.size() >= max_tasks_)
944 return;
945
946 scoped_refptr<WorkerThread> new_worker =
947 CreateAndRegisterWorkerLockRequired(executor);
948 DCHECK(new_worker);
949 idle_workers_stack_.Push(new_worker.get());
950 }
951
952 scoped_refptr<WorkerThread>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)953 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
954 ScopedCommandsExecutor* executor) {
955 DCHECK(!join_for_testing_started_);
956 DCHECK_LT(workers_.size(), max_tasks_);
957 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
958 DCHECK(idle_workers_stack_.IsEmpty());
959
960 // WorkerThread needs |lock_| as a predecessor for its thread lock
961 // because in WakeUpOneWorker, |lock_| is first acquired and then
962 // the thread lock is acquired when WakeUp is called on the worker.
963 scoped_refptr<WorkerThread> worker =
964 MakeRefCounted<WorkerThread>(priority_hint_,
965 std::make_unique<WorkerThreadDelegateImpl>(
966 tracked_ref_factory_.GetTrackedRef()),
967 task_tracker_, &lock_);
968
969 workers_.push_back(worker);
970 executor->ScheduleStart(worker);
971 DCHECK_LE(workers_.size(), max_tasks_);
972
973 return worker;
974 }
975
GetNumAwakeWorkersLockRequired() const976 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
977 DCHECK_GE(workers_.size(), idle_workers_stack_.Size());
978 size_t num_awake_workers = workers_.size() - idle_workers_stack_.Size();
979 DCHECK_GE(num_awake_workers, num_running_tasks_);
980 return num_awake_workers;
981 }
982
GetDesiredNumAwakeWorkersLockRequired() const983 size_t ThreadGroupImpl::GetDesiredNumAwakeWorkersLockRequired() const {
984 // Number of BEST_EFFORT task sources that are running or queued and allowed
985 // to run by the CanRunPolicy.
986 const size_t num_running_or_queued_can_run_best_effort_task_sources =
987 num_running_best_effort_tasks_ +
988 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
989
990 const size_t workers_for_best_effort_task_sources =
991 std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
992 max_best_effort_tasks_),
993 num_running_best_effort_tasks_);
994
995 // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
996 const size_t num_running_or_queued_foreground_task_sources =
997 (num_running_tasks_ - num_running_best_effort_tasks_) +
998 GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
999
1000 const size_t workers_for_foreground_task_sources =
1001 num_running_or_queued_foreground_task_sources;
1002
1003 return std::min({workers_for_best_effort_task_sources +
1004 workers_for_foreground_task_sources,
1005 max_tasks_, kMaxNumberOfWorkers});
1006 }
1007
DidUpdateCanRunPolicy()1008 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
1009 ScopedCommandsExecutor executor(this);
1010 CheckedAutoLock auto_lock(lock_);
1011 EnsureEnoughWorkersLockRequired(&executor);
1012 }
1013
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)1014 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
1015 BaseScopedCommandsExecutor* base_executor) {
1016 // Don't do anything if the thread group isn't started.
1017 if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
1018 return;
1019
1020 ScopedCommandsExecutor* executor =
1021 static_cast<ScopedCommandsExecutor*>(base_executor);
1022
1023 const size_t desired_num_awake_workers =
1024 GetDesiredNumAwakeWorkersLockRequired();
1025 const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
1026
1027 size_t num_workers_to_wake_up =
1028 ClampSub(desired_num_awake_workers, num_awake_workers);
1029 if (after_start().wakeup_strategy == WakeUpStrategy::kExponentialWakeUps) {
1030 num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
1031 } else if (after_start().wakeup_strategy ==
1032 WakeUpStrategy::kSerializedWakeUps) {
1033 num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(1U));
1034 }
1035
1036 // Wake up the appropriate number of workers.
1037 for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
1038 MaintainAtLeastOneIdleWorkerLockRequired(executor);
1039 WorkerThread* worker_to_wakeup = idle_workers_stack_.Pop();
1040 DCHECK(worker_to_wakeup);
1041 executor->ScheduleWakeUp(worker_to_wakeup);
1042 }
1043
1044 // In the case where the loop above didn't wake up any worker and we don't
1045 // have excess workers, the idle worker should be maintained. This happens
1046 // when called from the last worker awake, or a recent increase in |max_tasks|
1047 // now makes it possible to keep an idle worker.
1048 if (desired_num_awake_workers == num_awake_workers)
1049 MaintainAtLeastOneIdleWorkerLockRequired(executor);
1050
1051 // This function is called every time a task source is (re-)enqueued,
1052 // hence the minimum priority needs to be updated.
1053 UpdateMinAllowedPriorityLockRequired();
1054
1055 // Ensure that the number of workers is periodically adjusted if needed.
1056 MaybeScheduleAdjustMaxTasksLockRequired(executor);
1057 }
1058
AdjustMaxTasks()1059 void ThreadGroupImpl::AdjustMaxTasks() {
1060 DCHECK(
1061 after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
1062
1063 ScopedCommandsExecutor executor(this);
1064 CheckedAutoLock auto_lock(lock_);
1065 DCHECK(adjust_max_tasks_posted_);
1066 adjust_max_tasks_posted_ = false;
1067
1068 // Increment max tasks for each worker that has been within a MAY_BLOCK
1069 // ScopedBlockingCall for more than may_block_threshold.
1070 for (scoped_refptr<WorkerThread> worker : workers_) {
1071 // The delegates of workers inside a ThreadGroupImpl should be
1072 // WorkerThreadDelegateImpls.
1073 WorkerThreadDelegateImpl* delegate =
1074 static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
1075 AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
1076 delegate->MaybeIncrementMaxTasksLockRequired();
1077 }
1078
1079 // Wake up workers according to the updated |max_tasks_|. This will also
1080 // reschedule AdjustMaxTasks() if necessary.
1081 EnsureEnoughWorkersLockRequired(&executor);
1082 }
1083
ScheduleAdjustMaxTasks()1084 void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
1085 // |adjust_max_tasks_posted_| can't change before the task posted below runs.
1086 // Skip check on NaCl to avoid unsafe reference acquisition warning.
1087 #if !defined(OS_NACL)
1088 DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
1089 #endif
1090
1091 after_start().service_thread_task_runner->PostDelayedTask(
1092 FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
1093 after_start().blocked_workers_poll_period);
1094 }
1095
MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor * executor)1096 void ThreadGroupImpl::MaybeScheduleAdjustMaxTasksLockRequired(
1097 ScopedCommandsExecutor* executor) {
1098 if (!adjust_max_tasks_posted_ &&
1099 ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
1100 executor->ScheduleAdjustMaxTasks();
1101 adjust_max_tasks_posted_ = true;
1102 }
1103 }
1104
ShouldPeriodicallyAdjustMaxTasksLockRequired()1105 bool ThreadGroupImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
1106 // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
1107 // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
1108 // enough to accommodate all queued and running task sources and an idle
1109 // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
1110 // - When (1) is false: No worker would be created or woken up if the
1111 // concurrency limits were increased, so there is no hurry to increase them.
1112 // - When (2) is false: The concurrency limits could not be increased by
1113 // AdjustMaxTasks().
1114
1115 const size_t num_running_or_queued_best_effort_task_sources =
1116 num_running_best_effort_tasks_ +
1117 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
1118 if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
1119 num_unresolved_best_effort_may_block_ > 0) {
1120 return true;
1121 }
1122
1123 const size_t num_running_or_queued_task_sources =
1124 num_running_tasks_ +
1125 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
1126 GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
1127 constexpr size_t kIdleWorker = 1;
1128 return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
1129 num_unresolved_may_block_ > 0;
1130 }
1131
UpdateMinAllowedPriorityLockRequired()1132 void ThreadGroupImpl::UpdateMinAllowedPriorityLockRequired() {
1133 if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
1134 max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
1135 } else {
1136 max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
1137 priority_queue_.PeekSortKey().worker_count()},
1138 std::memory_order_relaxed);
1139 }
1140 }
1141
DecrementTasksRunningLockRequired(TaskPriority priority)1142 void ThreadGroupImpl::DecrementTasksRunningLockRequired(TaskPriority priority) {
1143 DCHECK_GT(num_running_tasks_, 0U);
1144 --num_running_tasks_;
1145 if (priority == TaskPriority::BEST_EFFORT) {
1146 DCHECK_GT(num_running_best_effort_tasks_, 0U);
1147 --num_running_best_effort_tasks_;
1148 }
1149 UpdateMinAllowedPriorityLockRequired();
1150 }
1151
IncrementTasksRunningLockRequired(TaskPriority priority)1152 void ThreadGroupImpl::IncrementTasksRunningLockRequired(TaskPriority priority) {
1153 ++num_running_tasks_;
1154 DCHECK_LE(num_running_tasks_, max_tasks_);
1155 DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
1156 if (priority == TaskPriority::BEST_EFFORT) {
1157 ++num_running_best_effort_tasks_;
1158 DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
1159 DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
1160 }
1161 UpdateMinAllowedPriorityLockRequired();
1162 }
1163
DecrementMaxTasksLockRequired()1164 void ThreadGroupImpl::DecrementMaxTasksLockRequired() {
1165 DCHECK_GT(num_running_tasks_, 0U);
1166 DCHECK_GT(max_tasks_, 0U);
1167 --max_tasks_;
1168 UpdateMinAllowedPriorityLockRequired();
1169 }
1170
IncrementMaxTasksLockRequired()1171 void ThreadGroupImpl::IncrementMaxTasksLockRequired() {
1172 DCHECK_GT(num_running_tasks_, 0U);
1173 ++max_tasks_;
1174 UpdateMinAllowedPriorityLockRequired();
1175 }
1176
DecrementMaxBestEffortTasksLockRequired()1177 void ThreadGroupImpl::DecrementMaxBestEffortTasksLockRequired() {
1178 DCHECK_GT(num_running_tasks_, 0U);
1179 DCHECK_GT(max_best_effort_tasks_, 0U);
1180 --max_best_effort_tasks_;
1181 UpdateMinAllowedPriorityLockRequired();
1182 }
1183
IncrementMaxBestEffortTasksLockRequired()1184 void ThreadGroupImpl::IncrementMaxBestEffortTasksLockRequired() {
1185 DCHECK_GT(num_running_tasks_, 0U);
1186 ++max_best_effort_tasks_;
1187 UpdateMinAllowedPriorityLockRequired();
1188 }
1189
1190 ThreadGroupImpl::InitializedInStart::InitializedInStart() = default;
1191 ThreadGroupImpl::InitializedInStart::~InitializedInStart() = default;
1192
1193 } // namespace internal
1194 } // namespace base
1195