1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/chromeos/file_system_provider/queue.h"
6 #include "base/bind.h"
7 #include "base/check_op.h"
8 #include "base/location.h"
9 #include "base/notreached.h"
10 #include "base/single_thread_task_runner.h"
11 #include "base/threading/thread_task_runner_handle.h"
12
13 namespace chromeos {
14 namespace file_system_provider {
15
Task()16 Queue::Task::Task() : token(0) {
17 }
18
Task(size_t token,AbortableCallback callback)19 Queue::Task::Task(size_t token, AbortableCallback callback)
20 : token(token), callback(std::move(callback)) {}
21
22 Queue::Task::Task(Task&& other) = default;
23 Queue::Task& Queue::Task::operator=(Task&& other) = default;
24
~Task()25 Queue::Task::~Task() {
26 }
27
Queue(size_t max_in_parallel)28 Queue::Queue(size_t max_in_parallel)
29 : max_in_parallel_(max_in_parallel), next_token_(1) {
30 CHECK_LT(0u, max_in_parallel);
31 }
32
~Queue()33 Queue::~Queue() {
34 }
35
NewToken()36 size_t Queue::NewToken() {
37 return next_token_++;
38 }
39
Enqueue(size_t token,AbortableCallback callback)40 void Queue::Enqueue(size_t token, AbortableCallback callback) {
41 #if !NDEBUG
42 CHECK(executed_.find(token) == executed_.end());
43 for (auto& task : pending_) {
44 CHECK(token != task.token);
45 }
46 #endif
47 pending_.push_back(Task(token, std::move(callback)));
48 base::ThreadTaskRunnerHandle::Get()->PostTask(
49 FROM_HERE,
50 base::BindOnce(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
51 }
52
Complete(size_t token)53 void Queue::Complete(size_t token) {
54 const auto it = executed_.find(token);
55 DCHECK(it != executed_.end());
56 executed_.erase(it);
57 base::ThreadTaskRunnerHandle::Get()->PostTask(
58 FROM_HERE,
59 base::BindOnce(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
60 }
61
MaybeRun()62 void Queue::MaybeRun() {
63 if (executed_.size() == max_in_parallel_ || pending_.empty())
64 return;
65
66 CHECK_GT(max_in_parallel_, executed_.size());
67 Task task = std::move(pending_.front());
68 pending_.pop_front();
69
70 auto callback = std::move(task.callback);
71 executed_[task.token] = std::move(task);
72 AbortCallback abort_callback = std::move(callback).Run();
73
74 // It may happen that the task is completed and removed synchronously. Hence,
75 // we need to check if the task is still in the executed collection.
76 const auto executed_task_it = executed_.find(task.token);
77 if (executed_task_it != executed_.end())
78 executed_task_it->second.abort_callback = std::move(abort_callback);
79 }
80
Abort(size_t token)81 void Queue::Abort(size_t token) {
82 // Check if it's running. If so, then abort and expect a Complete() call soon.
83 const auto it = executed_.find(token);
84 if (it != executed_.end()) {
85 Task& task = it->second;
86 AbortCallback abort_callback = std::move(task.abort_callback);
87 DCHECK(!abort_callback.is_null());
88 std::move(abort_callback).Run();
89 return;
90 }
91
92 // Aborting not running tasks is linear. TODO(mtomasz): Optimize if feasible.
93 for (auto it = pending_.begin(); it != pending_.end(); ++it) {
94 if (token == it->token) {
95 pending_.erase(it);
96 base::ThreadTaskRunnerHandle::Get()->PostTask(
97 FROM_HERE,
98 base::BindOnce(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
99 return;
100 }
101 }
102
103 // The task is already removed, marked as completed or aborted.
104 NOTREACHED();
105 }
106
107 } // namespace file_system_provider
108 } // namespace chromeos
109