1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7 #include <memory>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/sequenced_task_runner.h"
15 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
16 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
17 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
18
19 using storage::FileSystemURL;
20
21 namespace sync_file_system {
22 namespace drive_backend {
23
24 namespace {
25
26 class SyncTaskAdapter : public ExclusiveTask {
27 public:
SyncTaskAdapter(const SyncTaskManager::Task & task)28 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
~SyncTaskAdapter()29 ~SyncTaskAdapter() override {}
30
RunExclusive(const SyncStatusCallback & callback)31 void RunExclusive(const SyncStatusCallback& callback) override {
32 task_.Run(callback);
33 }
34
35 private:
36 SyncTaskManager::Task task_;
37
38 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
39 };
40
41 } // namespace
42
PendingTask()43 SyncTaskManager::PendingTask::PendingTask() {}
44
PendingTask(const base::Closure & task,Priority pri,int seq)45 SyncTaskManager::PendingTask::PendingTask(
46 const base::Closure& task, Priority pri, int seq)
47 : task(task), priority(pri), seq(seq) {}
48
49 SyncTaskManager::PendingTask::PendingTask(const PendingTask& other) = default;
50
~PendingTask()51 SyncTaskManager::PendingTask::~PendingTask() {}
52
operator ()(const PendingTask & left,const PendingTask & right) const53 bool SyncTaskManager::PendingTaskComparator::operator()(
54 const PendingTask& left,
55 const PendingTask& right) const {
56 if (left.priority != right.priority)
57 return left.priority < right.priority;
58 return left.seq > right.seq;
59 }
60
SyncTaskManager(base::WeakPtr<Client> client,size_t maximum_background_task,const scoped_refptr<base::SequencedTaskRunner> & task_runner)61 SyncTaskManager::SyncTaskManager(
62 base::WeakPtr<Client> client,
63 size_t maximum_background_task,
64 const scoped_refptr<base::SequencedTaskRunner>& task_runner)
65 : client_(client),
66 maximum_background_task_(maximum_background_task),
67 pending_task_seq_(0),
68 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
69 task_runner_(task_runner) {}
70
~SyncTaskManager()71 SyncTaskManager::~SyncTaskManager() {
72 weak_ptr_factory_.InvalidateWeakPtrs();
73
74 client_.reset();
75 token_.reset();
76 }
77
Initialize(SyncStatusCode status)78 void SyncTaskManager::Initialize(SyncStatusCode status) {
79 DCHECK(sequence_checker_.CalledOnValidSequence());
80 DCHECK(!token_);
81 NotifyTaskDone(
82 SyncTaskToken::CreateForForegroundTask(
83 weak_ptr_factory_.GetWeakPtr(), task_runner_.get()),
84 status);
85 }
86
ScheduleTask(const base::Location & from_here,const Task & task,Priority priority,const SyncStatusCallback & callback)87 void SyncTaskManager::ScheduleTask(const base::Location& from_here,
88 const Task& task,
89 Priority priority,
90 const SyncStatusCallback& callback) {
91 DCHECK(sequence_checker_.CalledOnValidSequence());
92
93 ScheduleSyncTask(from_here,
94 std::unique_ptr<SyncTask>(new SyncTaskAdapter(task)),
95 priority, callback);
96 }
97
ScheduleSyncTask(const base::Location & from_here,std::unique_ptr<SyncTask> task,Priority priority,const SyncStatusCallback & callback)98 void SyncTaskManager::ScheduleSyncTask(const base::Location& from_here,
99 std::unique_ptr<SyncTask> task,
100 Priority priority,
101 const SyncStatusCallback& callback) {
102 DCHECK(sequence_checker_.CalledOnValidSequence());
103
104 std::unique_ptr<SyncTaskToken> token(GetToken(from_here, callback));
105 if (!token) {
106 PushPendingTask(
107 base::Bind(&SyncTaskManager::ScheduleSyncTask,
108 weak_ptr_factory_.GetWeakPtr(), from_here,
109 base::Passed(&task), priority, callback),
110 priority);
111 return;
112 }
113 RunTask(std::move(token), std::move(task));
114 }
115
ScheduleTaskIfIdle(const base::Location & from_here,const Task & task,const SyncStatusCallback & callback)116 bool SyncTaskManager::ScheduleTaskIfIdle(const base::Location& from_here,
117 const Task& task,
118 const SyncStatusCallback& callback) {
119 DCHECK(sequence_checker_.CalledOnValidSequence());
120
121 return ScheduleSyncTaskIfIdle(
122 from_here, std::unique_ptr<SyncTask>(new SyncTaskAdapter(task)),
123 callback);
124 }
125
ScheduleSyncTaskIfIdle(const base::Location & from_here,std::unique_ptr<SyncTask> task,const SyncStatusCallback & callback)126 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
127 const base::Location& from_here,
128 std::unique_ptr<SyncTask> task,
129 const SyncStatusCallback& callback) {
130 DCHECK(sequence_checker_.CalledOnValidSequence());
131
132 std::unique_ptr<SyncTaskToken> token(GetToken(from_here, callback));
133 if (!token)
134 return false;
135 RunTask(std::move(token), std::move(task));
136 return true;
137 }
138
139 // static
NotifyTaskDone(std::unique_ptr<SyncTaskToken> token,SyncStatusCode status)140 void SyncTaskManager::NotifyTaskDone(std::unique_ptr<SyncTaskToken> token,
141 SyncStatusCode status) {
142 DCHECK(token);
143
144 SyncTaskManager* manager = token->manager();
145 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
146 DCHECK(!manager);
147 SyncStatusCallback callback = token->callback();
148 token->clear_callback();
149 callback.Run(status);
150 return;
151 }
152
153 if (manager)
154 manager->NotifyTaskDoneBody(std::move(token), status);
155 }
156
157 // static
UpdateTaskBlocker(std::unique_ptr<SyncTaskToken> current_task_token,std::unique_ptr<TaskBlocker> task_blocker,const Continuation & continuation)158 void SyncTaskManager::UpdateTaskBlocker(
159 std::unique_ptr<SyncTaskToken> current_task_token,
160 std::unique_ptr<TaskBlocker> task_blocker,
161 const Continuation& continuation) {
162 DCHECK(current_task_token);
163
164 SyncTaskManager* manager = current_task_token->manager();
165 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
166 DCHECK(!manager);
167 continuation.Run(std::move(current_task_token));
168 return;
169 }
170
171 if (!manager)
172 return;
173
174 std::unique_ptr<SyncTaskToken> foreground_task_token;
175 std::unique_ptr<SyncTaskToken> background_task_token;
176 std::unique_ptr<TaskLogger::TaskLog> task_log =
177 current_task_token->PassTaskLog();
178 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
179 foreground_task_token = std::move(current_task_token);
180 else
181 background_task_token = std::move(current_task_token);
182
183 manager->UpdateTaskBlockerBody(
184 std::move(foreground_task_token), std::move(background_task_token),
185 std::move(task_log), std::move(task_blocker), continuation);
186 }
187
IsRunningTask(int64_t token_id) const188 bool SyncTaskManager::IsRunningTask(int64_t token_id) const {
189 DCHECK(sequence_checker_.CalledOnValidSequence());
190
191 // If the client is gone, all task should be aborted.
192 if (!client_)
193 return false;
194
195 if (token_id == SyncTaskToken::kForegroundTaskTokenID)
196 return true;
197
198 return running_background_tasks_.find(token_id) !=
199 running_background_tasks_.end();
200 }
201
DetachFromSequence()202 void SyncTaskManager::DetachFromSequence() {
203 sequence_checker_.DetachFromSequence();
204 }
205
NotifyTaskDoneBody(std::unique_ptr<SyncTaskToken> token,SyncStatusCode status)206 void SyncTaskManager::NotifyTaskDoneBody(std::unique_ptr<SyncTaskToken> token,
207 SyncStatusCode status) {
208 DCHECK(sequence_checker_.CalledOnValidSequence());
209 DCHECK(token);
210
211 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
212 << " (" << SyncStatusCodeToString(status) << ")"
213 << " " << token->location().ToString();
214
215 if (token->task_blocker()) {
216 dependency_manager_.Erase(token->task_blocker());
217 token->clear_task_blocker();
218 }
219
220 if (client_) {
221 if (token->has_task_log()) {
222 token->FinalizeTaskLog(SyncStatusCodeToString(status));
223 client_->RecordTaskLog(token->PassTaskLog());
224 }
225 }
226
227 std::unique_ptr<SyncTask> task;
228 SyncStatusCallback callback = token->callback();
229 token->clear_callback();
230 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
231 token_ = std::move(token);
232 task = std::move(running_foreground_task_);
233 } else {
234 task = std::move(running_background_tasks_[token->token_id()]);
235 running_background_tasks_.erase(token->token_id());
236 }
237
238 // Acquire the token to prevent a new task to jump into the queue.
239 token = std::move(token_);
240
241 bool task_used_network = false;
242 if (task)
243 task_used_network = task->used_network();
244
245 if (client_)
246 client_->NotifyLastOperationStatus(status, task_used_network);
247
248 if (!callback.is_null())
249 callback.Run(status);
250
251 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
252 // making the call-chaing longer.
253 task_runner_->PostTask(
254 FROM_HERE,
255 base::BindOnce(&SyncTaskManager::MaybeStartNextForegroundTask,
256 weak_ptr_factory_.GetWeakPtr(), std::move(token)));
257 }
258
UpdateTaskBlockerBody(std::unique_ptr<SyncTaskToken> foreground_task_token,std::unique_ptr<SyncTaskToken> background_task_token,std::unique_ptr<TaskLogger::TaskLog> task_log,std::unique_ptr<TaskBlocker> task_blocker,const Continuation & continuation)259 void SyncTaskManager::UpdateTaskBlockerBody(
260 std::unique_ptr<SyncTaskToken> foreground_task_token,
261 std::unique_ptr<SyncTaskToken> background_task_token,
262 std::unique_ptr<TaskLogger::TaskLog> task_log,
263 std::unique_ptr<TaskBlocker> task_blocker,
264 const Continuation& continuation) {
265 DCHECK(sequence_checker_.CalledOnValidSequence());
266
267 // Run the task directly if the parallelization is disabled.
268 if (!maximum_background_task_) {
269 DCHECK(foreground_task_token);
270 DCHECK(!background_task_token);
271 foreground_task_token->SetTaskLog(std::move(task_log));
272 continuation.Run(std::move(foreground_task_token));
273 return;
274 }
275
276 // Clear existing |task_blocker| from |dependency_manager_| before
277 // getting |foreground_task_token|, so that we can avoid dead lock.
278 if (background_task_token && background_task_token->task_blocker()) {
279 dependency_manager_.Erase(background_task_token->task_blocker());
280 background_task_token->clear_task_blocker();
281 }
282
283 // Try to get |foreground_task_token|. If it's not available, wait for
284 // current foreground task to finish.
285 if (!foreground_task_token) {
286 DCHECK(background_task_token);
287 foreground_task_token = GetToken(background_task_token->location(),
288 SyncStatusCallback());
289 if (!foreground_task_token) {
290 PushPendingTask(
291 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
292 weak_ptr_factory_.GetWeakPtr(),
293 base::Passed(&foreground_task_token),
294 base::Passed(&background_task_token),
295 base::Passed(&task_log),
296 base::Passed(&task_blocker),
297 continuation),
298 PRIORITY_HIGH);
299 MaybeStartNextForegroundTask(nullptr);
300 return;
301 }
302 }
303
304 // Check if the task can run as a background task now.
305 // If there are too many task running or any other task blocks current
306 // task, wait for any other task to finish.
307 bool task_number_limit_exceeded =
308 !background_task_token &&
309 running_background_tasks_.size() >= maximum_background_task_;
310 if (task_number_limit_exceeded ||
311 !dependency_manager_.Insert(task_blocker.get())) {
312 DCHECK(!running_background_tasks_.empty());
313 DCHECK(pending_backgrounding_task_.is_null());
314
315 // Wait for NotifyTaskDone to release a |task_blocker|.
316 pending_backgrounding_task_ =
317 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
318 weak_ptr_factory_.GetWeakPtr(),
319 base::Passed(&foreground_task_token),
320 base::Passed(&background_task_token),
321 base::Passed(&task_log),
322 base::Passed(&task_blocker),
323 continuation);
324 return;
325 }
326
327 if (background_task_token) {
328 background_task_token->set_task_blocker(std::move(task_blocker));
329 } else {
330 base::Location from_here = foreground_task_token->location();
331 SyncStatusCallback callback = foreground_task_token->callback();
332 foreground_task_token->clear_callback();
333
334 background_task_token = SyncTaskToken::CreateForBackgroundTask(
335 weak_ptr_factory_.GetWeakPtr(), task_runner_.get(), task_token_seq_++,
336 std::move(task_blocker));
337 background_task_token->UpdateTask(from_here, callback);
338 running_background_tasks_[background_task_token->token_id()] =
339 std::move(running_foreground_task_);
340 }
341
342 token_ = std::move(foreground_task_token);
343 MaybeStartNextForegroundTask(nullptr);
344 background_task_token->SetTaskLog(std::move(task_log));
345 continuation.Run(std::move(background_task_token));
346 }
347
GetToken(const base::Location & from_here,const SyncStatusCallback & callback)348 std::unique_ptr<SyncTaskToken> SyncTaskManager::GetToken(
349 const base::Location& from_here,
350 const SyncStatusCallback& callback) {
351 DCHECK(sequence_checker_.CalledOnValidSequence());
352
353 if (!token_)
354 return nullptr;
355 token_->UpdateTask(from_here, callback);
356 return std::move(token_);
357 }
358
PushPendingTask(const base::Closure & closure,Priority priority)359 void SyncTaskManager::PushPendingTask(
360 const base::Closure& closure, Priority priority) {
361 DCHECK(sequence_checker_.CalledOnValidSequence());
362
363 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
364 }
365
RunTask(std::unique_ptr<SyncTaskToken> token,std::unique_ptr<SyncTask> task)366 void SyncTaskManager::RunTask(std::unique_ptr<SyncTaskToken> token,
367 std::unique_ptr<SyncTask> task) {
368 DCHECK(sequence_checker_.CalledOnValidSequence());
369 DCHECK(!running_foreground_task_);
370
371 running_foreground_task_ = std::move(task);
372 running_foreground_task_->RunPreflight(std::move(token));
373 }
374
MaybeStartNextForegroundTask(std::unique_ptr<SyncTaskToken> token)375 void SyncTaskManager::MaybeStartNextForegroundTask(
376 std::unique_ptr<SyncTaskToken> token) {
377 DCHECK(sequence_checker_.CalledOnValidSequence());
378
379 if (token) {
380 DCHECK(!token_);
381 token_ = std::move(token);
382 }
383
384 if (!pending_backgrounding_task_.is_null()) {
385 base::Closure closure = pending_backgrounding_task_;
386 pending_backgrounding_task_.Reset();
387 closure.Run();
388 return;
389 }
390
391 if (!token_)
392 return;
393
394 if (!pending_tasks_.empty()) {
395 base::Closure closure = pending_tasks_.top().task;
396 pending_tasks_.pop();
397 closure.Run();
398 return;
399 }
400
401 if (client_)
402 client_->MaybeScheduleNextTask();
403 }
404
405 } // namespace drive_backend
406 } // namespace sync_file_system
407