1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6 
7 #include <memory>
8 #include <utility>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/sequenced_task_runner.h"
15 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
16 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
17 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
18 
19 using storage::FileSystemURL;
20 
21 namespace sync_file_system {
22 namespace drive_backend {
23 
24 namespace {
25 
26 class SyncTaskAdapter : public ExclusiveTask {
27  public:
SyncTaskAdapter(const SyncTaskManager::Task & task)28   explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
~SyncTaskAdapter()29   ~SyncTaskAdapter() override {}
30 
RunExclusive(const SyncStatusCallback & callback)31   void RunExclusive(const SyncStatusCallback& callback) override {
32     task_.Run(callback);
33   }
34 
35  private:
36   SyncTaskManager::Task task_;
37 
38   DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
39 };
40 
41 }  // namespace
42 
PendingTask()43 SyncTaskManager::PendingTask::PendingTask() {}
44 
PendingTask(const base::Closure & task,Priority pri,int seq)45 SyncTaskManager::PendingTask::PendingTask(
46     const base::Closure& task, Priority pri, int seq)
47     : task(task), priority(pri), seq(seq) {}
48 
49 SyncTaskManager::PendingTask::PendingTask(const PendingTask& other) = default;
50 
~PendingTask()51 SyncTaskManager::PendingTask::~PendingTask() {}
52 
operator ()(const PendingTask & left,const PendingTask & right) const53 bool SyncTaskManager::PendingTaskComparator::operator()(
54     const PendingTask& left,
55     const PendingTask& right) const {
56   if (left.priority != right.priority)
57     return left.priority < right.priority;
58   return left.seq > right.seq;
59 }
60 
SyncTaskManager(base::WeakPtr<Client> client,size_t maximum_background_task,const scoped_refptr<base::SequencedTaskRunner> & task_runner)61 SyncTaskManager::SyncTaskManager(
62     base::WeakPtr<Client> client,
63     size_t maximum_background_task,
64     const scoped_refptr<base::SequencedTaskRunner>& task_runner)
65     : client_(client),
66       maximum_background_task_(maximum_background_task),
67       pending_task_seq_(0),
68       task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
69       task_runner_(task_runner) {}
70 
~SyncTaskManager()71 SyncTaskManager::~SyncTaskManager() {
72   weak_ptr_factory_.InvalidateWeakPtrs();
73 
74   client_.reset();
75   token_.reset();
76 }
77 
Initialize(SyncStatusCode status)78 void SyncTaskManager::Initialize(SyncStatusCode status) {
79   DCHECK(sequence_checker_.CalledOnValidSequence());
80   DCHECK(!token_);
81   NotifyTaskDone(
82       SyncTaskToken::CreateForForegroundTask(
83           weak_ptr_factory_.GetWeakPtr(), task_runner_.get()),
84       status);
85 }
86 
ScheduleTask(const base::Location & from_here,const Task & task,Priority priority,const SyncStatusCallback & callback)87 void SyncTaskManager::ScheduleTask(const base::Location& from_here,
88                                    const Task& task,
89                                    Priority priority,
90                                    const SyncStatusCallback& callback) {
91   DCHECK(sequence_checker_.CalledOnValidSequence());
92 
93   ScheduleSyncTask(from_here,
94                    std::unique_ptr<SyncTask>(new SyncTaskAdapter(task)),
95                    priority, callback);
96 }
97 
ScheduleSyncTask(const base::Location & from_here,std::unique_ptr<SyncTask> task,Priority priority,const SyncStatusCallback & callback)98 void SyncTaskManager::ScheduleSyncTask(const base::Location& from_here,
99                                        std::unique_ptr<SyncTask> task,
100                                        Priority priority,
101                                        const SyncStatusCallback& callback) {
102   DCHECK(sequence_checker_.CalledOnValidSequence());
103 
104   std::unique_ptr<SyncTaskToken> token(GetToken(from_here, callback));
105   if (!token) {
106     PushPendingTask(
107         base::Bind(&SyncTaskManager::ScheduleSyncTask,
108                    weak_ptr_factory_.GetWeakPtr(), from_here,
109                    base::Passed(&task), priority, callback),
110         priority);
111     return;
112   }
113   RunTask(std::move(token), std::move(task));
114 }
115 
ScheduleTaskIfIdle(const base::Location & from_here,const Task & task,const SyncStatusCallback & callback)116 bool SyncTaskManager::ScheduleTaskIfIdle(const base::Location& from_here,
117                                          const Task& task,
118                                          const SyncStatusCallback& callback) {
119   DCHECK(sequence_checker_.CalledOnValidSequence());
120 
121   return ScheduleSyncTaskIfIdle(
122       from_here, std::unique_ptr<SyncTask>(new SyncTaskAdapter(task)),
123       callback);
124 }
125 
ScheduleSyncTaskIfIdle(const base::Location & from_here,std::unique_ptr<SyncTask> task,const SyncStatusCallback & callback)126 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
127     const base::Location& from_here,
128     std::unique_ptr<SyncTask> task,
129     const SyncStatusCallback& callback) {
130   DCHECK(sequence_checker_.CalledOnValidSequence());
131 
132   std::unique_ptr<SyncTaskToken> token(GetToken(from_here, callback));
133   if (!token)
134     return false;
135   RunTask(std::move(token), std::move(task));
136   return true;
137 }
138 
139 // static
NotifyTaskDone(std::unique_ptr<SyncTaskToken> token,SyncStatusCode status)140 void SyncTaskManager::NotifyTaskDone(std::unique_ptr<SyncTaskToken> token,
141                                      SyncStatusCode status) {
142   DCHECK(token);
143 
144   SyncTaskManager* manager = token->manager();
145   if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
146     DCHECK(!manager);
147     SyncStatusCallback callback = token->callback();
148     token->clear_callback();
149     callback.Run(status);
150     return;
151   }
152 
153   if (manager)
154     manager->NotifyTaskDoneBody(std::move(token), status);
155 }
156 
157 // static
UpdateTaskBlocker(std::unique_ptr<SyncTaskToken> current_task_token,std::unique_ptr<TaskBlocker> task_blocker,const Continuation & continuation)158 void SyncTaskManager::UpdateTaskBlocker(
159     std::unique_ptr<SyncTaskToken> current_task_token,
160     std::unique_ptr<TaskBlocker> task_blocker,
161     const Continuation& continuation) {
162   DCHECK(current_task_token);
163 
164   SyncTaskManager* manager = current_task_token->manager();
165   if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
166     DCHECK(!manager);
167     continuation.Run(std::move(current_task_token));
168     return;
169   }
170 
171   if (!manager)
172     return;
173 
174   std::unique_ptr<SyncTaskToken> foreground_task_token;
175   std::unique_ptr<SyncTaskToken> background_task_token;
176   std::unique_ptr<TaskLogger::TaskLog> task_log =
177       current_task_token->PassTaskLog();
178   if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
179     foreground_task_token = std::move(current_task_token);
180   else
181     background_task_token = std::move(current_task_token);
182 
183   manager->UpdateTaskBlockerBody(
184       std::move(foreground_task_token), std::move(background_task_token),
185       std::move(task_log), std::move(task_blocker), continuation);
186 }
187 
IsRunningTask(int64_t token_id) const188 bool SyncTaskManager::IsRunningTask(int64_t token_id) const {
189   DCHECK(sequence_checker_.CalledOnValidSequence());
190 
191   // If the client is gone, all task should be aborted.
192   if (!client_)
193     return false;
194 
195   if (token_id == SyncTaskToken::kForegroundTaskTokenID)
196     return true;
197 
198   return running_background_tasks_.find(token_id) !=
199          running_background_tasks_.end();
200 }
201 
DetachFromSequence()202 void SyncTaskManager::DetachFromSequence() {
203   sequence_checker_.DetachFromSequence();
204 }
205 
NotifyTaskDoneBody(std::unique_ptr<SyncTaskToken> token,SyncStatusCode status)206 void SyncTaskManager::NotifyTaskDoneBody(std::unique_ptr<SyncTaskToken> token,
207                                          SyncStatusCode status) {
208   DCHECK(sequence_checker_.CalledOnValidSequence());
209   DCHECK(token);
210 
211   DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
212            << " (" << SyncStatusCodeToString(status) << ")"
213            << " " << token->location().ToString();
214 
215   if (token->task_blocker()) {
216     dependency_manager_.Erase(token->task_blocker());
217     token->clear_task_blocker();
218   }
219 
220   if (client_) {
221     if (token->has_task_log()) {
222       token->FinalizeTaskLog(SyncStatusCodeToString(status));
223       client_->RecordTaskLog(token->PassTaskLog());
224     }
225   }
226 
227   std::unique_ptr<SyncTask> task;
228   SyncStatusCallback callback = token->callback();
229   token->clear_callback();
230   if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
231     token_ = std::move(token);
232     task = std::move(running_foreground_task_);
233   } else {
234     task = std::move(running_background_tasks_[token->token_id()]);
235     running_background_tasks_.erase(token->token_id());
236   }
237 
238   // Acquire the token to prevent a new task to jump into the queue.
239   token = std::move(token_);
240 
241   bool task_used_network = false;
242   if (task)
243     task_used_network = task->used_network();
244 
245   if (client_)
246     client_->NotifyLastOperationStatus(status, task_used_network);
247 
248   if (!callback.is_null())
249     callback.Run(status);
250 
251   // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
252   // making the call-chaing longer.
253   task_runner_->PostTask(
254       FROM_HERE,
255       base::BindOnce(&SyncTaskManager::MaybeStartNextForegroundTask,
256                      weak_ptr_factory_.GetWeakPtr(), std::move(token)));
257 }
258 
UpdateTaskBlockerBody(std::unique_ptr<SyncTaskToken> foreground_task_token,std::unique_ptr<SyncTaskToken> background_task_token,std::unique_ptr<TaskLogger::TaskLog> task_log,std::unique_ptr<TaskBlocker> task_blocker,const Continuation & continuation)259 void SyncTaskManager::UpdateTaskBlockerBody(
260     std::unique_ptr<SyncTaskToken> foreground_task_token,
261     std::unique_ptr<SyncTaskToken> background_task_token,
262     std::unique_ptr<TaskLogger::TaskLog> task_log,
263     std::unique_ptr<TaskBlocker> task_blocker,
264     const Continuation& continuation) {
265   DCHECK(sequence_checker_.CalledOnValidSequence());
266 
267   // Run the task directly if the parallelization is disabled.
268   if (!maximum_background_task_) {
269     DCHECK(foreground_task_token);
270     DCHECK(!background_task_token);
271     foreground_task_token->SetTaskLog(std::move(task_log));
272     continuation.Run(std::move(foreground_task_token));
273     return;
274   }
275 
276   // Clear existing |task_blocker| from |dependency_manager_| before
277   // getting |foreground_task_token|, so that we can avoid dead lock.
278   if (background_task_token && background_task_token->task_blocker()) {
279     dependency_manager_.Erase(background_task_token->task_blocker());
280     background_task_token->clear_task_blocker();
281   }
282 
283   // Try to get |foreground_task_token|.  If it's not available, wait for
284   // current foreground task to finish.
285   if (!foreground_task_token) {
286     DCHECK(background_task_token);
287     foreground_task_token = GetToken(background_task_token->location(),
288                                      SyncStatusCallback());
289     if (!foreground_task_token) {
290       PushPendingTask(
291           base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
292                      weak_ptr_factory_.GetWeakPtr(),
293                      base::Passed(&foreground_task_token),
294                      base::Passed(&background_task_token),
295                      base::Passed(&task_log),
296                      base::Passed(&task_blocker),
297                      continuation),
298           PRIORITY_HIGH);
299       MaybeStartNextForegroundTask(nullptr);
300       return;
301     }
302   }
303 
304   // Check if the task can run as a background task now.
305   // If there are too many task running or any other task blocks current
306   // task, wait for any other task to finish.
307   bool task_number_limit_exceeded =
308       !background_task_token &&
309       running_background_tasks_.size() >= maximum_background_task_;
310   if (task_number_limit_exceeded ||
311       !dependency_manager_.Insert(task_blocker.get())) {
312     DCHECK(!running_background_tasks_.empty());
313     DCHECK(pending_backgrounding_task_.is_null());
314 
315     // Wait for NotifyTaskDone to release a |task_blocker|.
316     pending_backgrounding_task_ =
317         base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
318                    weak_ptr_factory_.GetWeakPtr(),
319                    base::Passed(&foreground_task_token),
320                    base::Passed(&background_task_token),
321                    base::Passed(&task_log),
322                    base::Passed(&task_blocker),
323                    continuation);
324     return;
325   }
326 
327   if (background_task_token) {
328     background_task_token->set_task_blocker(std::move(task_blocker));
329   } else {
330     base::Location from_here = foreground_task_token->location();
331     SyncStatusCallback callback = foreground_task_token->callback();
332     foreground_task_token->clear_callback();
333 
334     background_task_token = SyncTaskToken::CreateForBackgroundTask(
335         weak_ptr_factory_.GetWeakPtr(), task_runner_.get(), task_token_seq_++,
336         std::move(task_blocker));
337     background_task_token->UpdateTask(from_here, callback);
338     running_background_tasks_[background_task_token->token_id()] =
339         std::move(running_foreground_task_);
340   }
341 
342   token_ = std::move(foreground_task_token);
343   MaybeStartNextForegroundTask(nullptr);
344   background_task_token->SetTaskLog(std::move(task_log));
345   continuation.Run(std::move(background_task_token));
346 }
347 
GetToken(const base::Location & from_here,const SyncStatusCallback & callback)348 std::unique_ptr<SyncTaskToken> SyncTaskManager::GetToken(
349     const base::Location& from_here,
350     const SyncStatusCallback& callback) {
351   DCHECK(sequence_checker_.CalledOnValidSequence());
352 
353   if (!token_)
354     return nullptr;
355   token_->UpdateTask(from_here, callback);
356   return std::move(token_);
357 }
358 
PushPendingTask(const base::Closure & closure,Priority priority)359 void SyncTaskManager::PushPendingTask(
360     const base::Closure& closure, Priority priority) {
361   DCHECK(sequence_checker_.CalledOnValidSequence());
362 
363   pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
364 }
365 
RunTask(std::unique_ptr<SyncTaskToken> token,std::unique_ptr<SyncTask> task)366 void SyncTaskManager::RunTask(std::unique_ptr<SyncTaskToken> token,
367                               std::unique_ptr<SyncTask> task) {
368   DCHECK(sequence_checker_.CalledOnValidSequence());
369   DCHECK(!running_foreground_task_);
370 
371   running_foreground_task_ = std::move(task);
372   running_foreground_task_->RunPreflight(std::move(token));
373 }
374 
MaybeStartNextForegroundTask(std::unique_ptr<SyncTaskToken> token)375 void SyncTaskManager::MaybeStartNextForegroundTask(
376     std::unique_ptr<SyncTaskToken> token) {
377   DCHECK(sequence_checker_.CalledOnValidSequence());
378 
379   if (token) {
380     DCHECK(!token_);
381     token_ = std::move(token);
382   }
383 
384   if (!pending_backgrounding_task_.is_null()) {
385     base::Closure closure = pending_backgrounding_task_;
386     pending_backgrounding_task_.Reset();
387     closure.Run();
388     return;
389   }
390 
391   if (!token_)
392     return;
393 
394   if (!pending_tasks_.empty()) {
395     base::Closure closure = pending_tasks_.top().task;
396     pending_tasks_.pop();
397     closure.Run();
398     return;
399   }
400 
401   if (client_)
402     client_->MaybeScheduleNextTask();
403 }
404 
405 }  // namespace drive_backend
406 }  // namespace sync_file_system
407