1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/chromeos/file_system_provider/queue.h"
6 #include "base/bind.h"
7 #include "base/check_op.h"
8 #include "base/location.h"
9 #include "base/notreached.h"
10 #include "base/single_thread_task_runner.h"
11 #include "base/threading/thread_task_runner_handle.h"
12 
13 namespace chromeos {
14 namespace file_system_provider {
15 
Task()16 Queue::Task::Task() : token(0) {
17 }
18 
Task(size_t token,AbortableCallback callback)19 Queue::Task::Task(size_t token, AbortableCallback callback)
20     : token(token), callback(std::move(callback)) {}
21 
22 Queue::Task::Task(Task&& other) = default;
23 Queue::Task& Queue::Task::operator=(Task&& other) = default;
24 
~Task()25 Queue::Task::~Task() {
26 }
27 
Queue(size_t max_in_parallel)28 Queue::Queue(size_t max_in_parallel)
29     : max_in_parallel_(max_in_parallel), next_token_(1) {
30   CHECK_LT(0u, max_in_parallel);
31 }
32 
~Queue()33 Queue::~Queue() {
34 }
35 
NewToken()36 size_t Queue::NewToken() {
37   return next_token_++;
38 }
39 
Enqueue(size_t token,AbortableCallback callback)40 void Queue::Enqueue(size_t token, AbortableCallback callback) {
41 #if !NDEBUG
42   CHECK(executed_.find(token) == executed_.end());
43   for (auto& task : pending_) {
44     CHECK(token != task.token);
45   }
46 #endif
47   pending_.push_back(Task(token, std::move(callback)));
48   base::ThreadTaskRunnerHandle::Get()->PostTask(
49       FROM_HERE,
50       base::BindOnce(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
51 }
52 
Complete(size_t token)53 void Queue::Complete(size_t token) {
54   const auto it = executed_.find(token);
55   DCHECK(it != executed_.end());
56   executed_.erase(it);
57   base::ThreadTaskRunnerHandle::Get()->PostTask(
58       FROM_HERE,
59       base::BindOnce(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
60 }
61 
MaybeRun()62 void Queue::MaybeRun() {
63   if (executed_.size() == max_in_parallel_ || pending_.empty())
64     return;
65 
66   CHECK_GT(max_in_parallel_, executed_.size());
67   Task task = std::move(pending_.front());
68   pending_.pop_front();
69 
70   auto callback = std::move(task.callback);
71   executed_[task.token] = std::move(task);
72   AbortCallback abort_callback = std::move(callback).Run();
73 
74   // It may happen that the task is completed and removed synchronously. Hence,
75   // we need to check if the task is still in the executed collection.
76   const auto executed_task_it = executed_.find(task.token);
77   if (executed_task_it != executed_.end())
78     executed_task_it->second.abort_callback = std::move(abort_callback);
79 }
80 
Abort(size_t token)81 void Queue::Abort(size_t token) {
82   // Check if it's running. If so, then abort and expect a Complete() call soon.
83   const auto it = executed_.find(token);
84   if (it != executed_.end()) {
85     Task& task = it->second;
86     AbortCallback abort_callback = std::move(task.abort_callback);
87     DCHECK(!abort_callback.is_null());
88     std::move(abort_callback).Run();
89     return;
90   }
91 
92   // Aborting not running tasks is linear. TODO(mtomasz): Optimize if feasible.
93   for (auto it = pending_.begin(); it != pending_.end(); ++it) {
94     if (token == it->token) {
95       pending_.erase(it);
96       base::ThreadTaskRunnerHandle::Get()->PostTask(
97           FROM_HERE,
98           base::BindOnce(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
99       return;
100     }
101   }
102 
103   // The task is already removed, marked as completed or aborted.
104   NOTREACHED();
105 }
106 
107 }  // namespace file_system_provider
108 }  // namespace chromeos
109