1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/thread_controller_impl.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/message_loop/message_pump.h"
12 #include "base/run_loop.h"
13 #include "base/task/sequence_manager/lazy_now.h"
14 #include "base/task/sequence_manager/sequence_manager_impl.h"
15 #include "base/task/sequence_manager/sequenced_task_source.h"
16 #include "base/trace_event/base_tracing.h"
17
18 namespace base {
19 namespace sequence_manager {
20 namespace internal {
21
22 using ShouldScheduleWork = WorkDeduplicator::ShouldScheduleWork;
23
ThreadControllerImpl(SequenceManagerImpl * funneled_sequence_manager,scoped_refptr<SingleThreadTaskRunner> task_runner,const TickClock * time_source)24 ThreadControllerImpl::ThreadControllerImpl(
25 SequenceManagerImpl* funneled_sequence_manager,
26 scoped_refptr<SingleThreadTaskRunner> task_runner,
27 const TickClock* time_source)
28 : funneled_sequence_manager_(funneled_sequence_manager),
29 task_runner_(task_runner),
30 associated_thread_(AssociatedThreadId::CreateUnbound()),
31 message_loop_task_runner_(funneled_sequence_manager
32 ? funneled_sequence_manager->GetTaskRunner()
33 : nullptr),
34 time_source_(time_source),
35 work_deduplicator_(associated_thread_) {
36 if (task_runner_ || funneled_sequence_manager_)
37 work_deduplicator_.BindToCurrentThread();
38 immediate_do_work_closure_ =
39 BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
40 WorkType::kImmediate);
41 delayed_do_work_closure_ =
42 BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
43 WorkType::kDelayed);
44
45 // Unlike ThreadControllerWithMessagePumpImpl, ThreadControllerImpl isn't
46 // explicitly Run(). Rather, DoWork() will be invoked at some point in the
47 // future when the associated thread begins pumping messages.
48 main_sequence_only().run_level_tracker.OnRunLoopStarted(
49 RunLevelTracker::kIdle);
50 }
51
~ThreadControllerImpl()52 ThreadControllerImpl::~ThreadControllerImpl() {
53 // Balances OnRunLoopStarted() in the constructor to satisfy the exit criteria
54 // of ~RunLevelTracker().
55 main_sequence_only().run_level_tracker.OnRunLoopEnded();
56 }
57
58 ThreadControllerImpl::MainSequenceOnly::MainSequenceOnly() = default;
59
60 ThreadControllerImpl::MainSequenceOnly::~MainSequenceOnly() = default;
61
Create(SequenceManagerImpl * funneled_sequence_manager,const TickClock * time_source)62 std::unique_ptr<ThreadControllerImpl> ThreadControllerImpl::Create(
63 SequenceManagerImpl* funneled_sequence_manager,
64 const TickClock* time_source) {
65 return WrapUnique(new ThreadControllerImpl(
66 funneled_sequence_manager,
67 funneled_sequence_manager ? funneled_sequence_manager->GetTaskRunner()
68 : nullptr,
69 time_source));
70 }
71
SetSequencedTaskSource(SequencedTaskSource * sequence)72 void ThreadControllerImpl::SetSequencedTaskSource(
73 SequencedTaskSource* sequence) {
74 DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
75 DCHECK(sequence);
76 DCHECK(!sequence_);
77 sequence_ = sequence;
78 }
79
SetTimerSlack(TimerSlack timer_slack)80 void ThreadControllerImpl::SetTimerSlack(TimerSlack timer_slack) {
81 if (!funneled_sequence_manager_)
82 return;
83 funneled_sequence_manager_->SetTimerSlack(timer_slack);
84 }
85
ScheduleWork()86 void ThreadControllerImpl::ScheduleWork() {
87 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
88 "ThreadControllerImpl::ScheduleWork::PostTask");
89
90 if (work_deduplicator_.OnWorkRequested() ==
91 ShouldScheduleWork::kScheduleImmediate) {
92 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
93 }
94 }
95
SetNextDelayedDoWork(LazyNow * lazy_now,TimeTicks run_time)96 void ThreadControllerImpl::SetNextDelayedDoWork(LazyNow* lazy_now,
97 TimeTicks run_time) {
98 DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
99 DCHECK(sequence_);
100
101 if (main_sequence_only().next_delayed_do_work == run_time)
102 return;
103
104 // Cancel DoWork if it was scheduled and we set an "infinite" delay now.
105 if (run_time == TimeTicks::Max()) {
106 cancelable_delayed_do_work_closure_.Cancel();
107 main_sequence_only().next_delayed_do_work = TimeTicks::Max();
108 return;
109 }
110
111 if (work_deduplicator_.OnDelayedWorkRequested() ==
112 ShouldScheduleWork::kNotNeeded) {
113 return;
114 }
115
116 base::TimeDelta delay = std::max(TimeDelta(), run_time - lazy_now->Now());
117 TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
118 "ThreadControllerImpl::SetNextDelayedDoWork::PostDelayedTask",
119 "delay_ms", delay.InMillisecondsF());
120
121 main_sequence_only().next_delayed_do_work = run_time;
122 // Reset also causes cancellation of the previous DoWork task.
123 cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
124 task_runner_->PostDelayedTask(
125 FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay);
126 }
127
RunsTasksInCurrentSequence()128 bool ThreadControllerImpl::RunsTasksInCurrentSequence() {
129 return task_runner_->RunsTasksInCurrentSequence();
130 }
131
GetClock()132 const TickClock* ThreadControllerImpl::GetClock() {
133 return time_source_;
134 }
135
SetDefaultTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner)136 void ThreadControllerImpl::SetDefaultTaskRunner(
137 scoped_refptr<SingleThreadTaskRunner> task_runner) {
138 #if DCHECK_IS_ON()
139 default_task_runner_set_ = true;
140 #endif
141 if (!funneled_sequence_manager_)
142 return;
143 funneled_sequence_manager_->SetTaskRunner(task_runner);
144 }
145
146 scoped_refptr<SingleThreadTaskRunner>
GetDefaultTaskRunner()147 ThreadControllerImpl::GetDefaultTaskRunner() {
148 return funneled_sequence_manager_->GetTaskRunner();
149 }
150
RestoreDefaultTaskRunner()151 void ThreadControllerImpl::RestoreDefaultTaskRunner() {
152 if (!funneled_sequence_manager_)
153 return;
154 funneled_sequence_manager_->SetTaskRunner(message_loop_task_runner_);
155 }
156
BindToCurrentThread(std::unique_ptr<MessagePump> message_pump)157 void ThreadControllerImpl::BindToCurrentThread(
158 std::unique_ptr<MessagePump> message_pump) {
159 NOTREACHED();
160 }
161
WillQueueTask(PendingTask * pending_task,const char * task_queue_name)162 void ThreadControllerImpl::WillQueueTask(PendingTask* pending_task,
163 const char* task_queue_name) {
164 task_annotator_.WillQueueTask("SequenceManager PostTask", pending_task,
165 task_queue_name);
166 }
167
DoWork(WorkType work_type)168 void ThreadControllerImpl::DoWork(WorkType work_type) {
169 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
170 "ThreadControllerImpl::DoWork");
171
172 DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
173 DCHECK(sequence_);
174
175 work_deduplicator_.OnWorkStarted();
176
177 WeakPtr<ThreadControllerImpl> weak_ptr = weak_factory_.GetWeakPtr();
178 // TODO(scheduler-dev): Consider moving to a time based work batch instead.
179 for (int i = 0; i < main_sequence_only().work_batch_size_; i++) {
180 Task* task = sequence_->SelectNextTask();
181 if (!task)
182 break;
183
184 // [OnTaskStarted(), OnTaskEnded()] must outscope all other tracing calls
185 // so that the "ThreadController active" trace event lives on top of all
186 // "run task" events. It must also encompass DidRunTask() to cover
187 // microtasks.
188 DCHECK_GT(main_sequence_only().run_level_tracker.num_run_levels(), 0U);
189 main_sequence_only().run_level_tracker.OnTaskStarted();
190 {
191 // Trace-parsing tools (DevTools, Lighthouse, etc) consume this event
192 // to determine long tasks.
193 // The event scope must span across DidRunTask call below to make sure
194 // it covers RunMicrotasks event.
195 // See https://crbug.com/681863 and https://crbug.com/874982
196 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("devtools.timeline"), "RunTask");
197
198 {
199 // Trace events should finish before we call DidRunTask to ensure that
200 // SequenceManager trace events do not interfere with them.
201 TRACE_TASK_EXECUTION("ThreadControllerImpl::RunTask", *task);
202 task_annotator_.RunTask("SequenceManager RunTask", task);
203 if (!weak_ptr)
204 return;
205 }
206
207 sequence_->DidRunTask();
208 }
209 main_sequence_only().run_level_tracker.OnTaskEnded();
210
211 // NOTE: https://crbug.com/828835.
212 // When we're running inside a nested RunLoop it may quit anytime, so any
213 // outstanding pending tasks must run in the outer RunLoop
214 // (see SequenceManagerTestWithMessageLoop.QuitWhileNested test).
215 // Unfortunately, it's MessageLoop who's receiving that signal and we can't
216 // know it before we return from DoWork, hence, OnExitNestedRunLoop
217 // will be called later. Since we must implement ThreadController and
218 // SequenceManager in conformance with MessageLoop task runners, we need
219 // to disable this batching optimization while nested.
220 // Implementing MessagePump::Delegate ourselves will help to resolve this
221 // issue.
222 if (main_sequence_only().run_level_tracker.num_run_levels() > 1)
223 break;
224 }
225
226 work_deduplicator_.WillCheckForMoreWork();
227
228 LazyNow lazy_now(time_source_);
229 TimeDelta delay_till_next_task = sequence_->DelayTillNextTask(&lazy_now);
230 // The OnSystemIdle callback allows the TimeDomains to advance virtual time
231 // in which case we now have immediate word to do.
232 if (delay_till_next_task <= TimeDelta() || sequence_->OnSystemIdle()) {
233 // The next task needs to run immediately, post a continuation if
234 // another thread didn't get there first.
235 if (work_deduplicator_.DidCheckForMoreWork(
236 WorkDeduplicator::NextTask::kIsImmediate) ==
237 ShouldScheduleWork::kScheduleImmediate) {
238 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
239 }
240 return;
241 }
242
243 // It looks like we have a non-zero delay, however another thread may have
244 // posted an immediate task while we computed the delay.
245 if (work_deduplicator_.DidCheckForMoreWork(
246 WorkDeduplicator::NextTask::kIsDelayed) ==
247 ShouldScheduleWork::kScheduleImmediate) {
248 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
249 return;
250 }
251
252 // No more immediate work.
253 main_sequence_only().run_level_tracker.OnIdle();
254
255 // Any future work?
256 if (delay_till_next_task == TimeDelta::Max()) {
257 main_sequence_only().next_delayed_do_work = TimeTicks::Max();
258 cancelable_delayed_do_work_closure_.Cancel();
259 return;
260 }
261
262 // Already requested next delay?
263 TimeTicks next_task_at = lazy_now.Now() + delay_till_next_task;
264 if (next_task_at == main_sequence_only().next_delayed_do_work)
265 return;
266
267 // Schedule a callback after |delay_till_next_task| and cancel any previous
268 // callback.
269 main_sequence_only().next_delayed_do_work = next_task_at;
270 cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
271 task_runner_->PostDelayedTask(FROM_HERE,
272 cancelable_delayed_do_work_closure_.callback(),
273 delay_till_next_task);
274 }
275
AddNestingObserver(RunLoop::NestingObserver * observer)276 void ThreadControllerImpl::AddNestingObserver(
277 RunLoop::NestingObserver* observer) {
278 DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
279 nesting_observer_ = observer;
280 RunLoop::AddNestingObserverOnCurrentThread(this);
281 }
282
RemoveNestingObserver(RunLoop::NestingObserver * observer)283 void ThreadControllerImpl::RemoveNestingObserver(
284 RunLoop::NestingObserver* observer) {
285 DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
286 DCHECK_EQ(observer, nesting_observer_);
287 nesting_observer_ = nullptr;
288 RunLoop::RemoveNestingObserverOnCurrentThread(this);
289 }
290
291 const scoped_refptr<AssociatedThreadId>&
GetAssociatedThread() const292 ThreadControllerImpl::GetAssociatedThread() const {
293 return associated_thread_;
294 }
295
OnBeginNestedRunLoop()296 void ThreadControllerImpl::OnBeginNestedRunLoop() {
297 main_sequence_only().run_level_tracker.OnRunLoopStarted(
298 RunLevelTracker::kSelectingNextTask);
299
300 // Just assume we have a pending task and post a DoWork to make sure we don't
301 // grind to a halt while nested.
302 work_deduplicator_.OnWorkRequested(); // Set the pending DoWork flag.
303 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
304
305 if (nesting_observer_)
306 nesting_observer_->OnBeginNestedRunLoop();
307 }
308
OnExitNestedRunLoop()309 void ThreadControllerImpl::OnExitNestedRunLoop() {
310 if (nesting_observer_)
311 nesting_observer_->OnExitNestedRunLoop();
312 main_sequence_only().run_level_tracker.OnRunLoopEnded();
313 }
314
SetWorkBatchSize(int work_batch_size)315 void ThreadControllerImpl::SetWorkBatchSize(int work_batch_size) {
316 main_sequence_only().work_batch_size_ = work_batch_size;
317 }
318
SetTaskExecutionAllowed(bool allowed)319 void ThreadControllerImpl::SetTaskExecutionAllowed(bool allowed) {
320 NOTREACHED();
321 }
322
IsTaskExecutionAllowed() const323 bool ThreadControllerImpl::IsTaskExecutionAllowed() const {
324 return true;
325 }
326
ShouldQuitRunLoopWhenIdle()327 bool ThreadControllerImpl::ShouldQuitRunLoopWhenIdle() {
328 // The MessageLoop does not expose the API needed to support this query.
329 return false;
330 }
331
GetBoundMessagePump() const332 MessagePump* ThreadControllerImpl::GetBoundMessagePump() const {
333 return nullptr;
334 }
335
336 #if defined(OS_IOS) || defined(OS_ANDROID)
AttachToMessagePump()337 void ThreadControllerImpl::AttachToMessagePump() {
338 NOTREACHED();
339 }
340 #endif // OS_IOS || OS_ANDROID
341
342 #if defined(OS_IOS)
DetachFromMessagePump()343 void ThreadControllerImpl::DetachFromMessagePump() {
344 NOTREACHED();
345 }
346 #endif // OS_IOS
347
348 } // namespace internal
349 } // namespace sequence_manager
350 } // namespace base
351