1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/work_queue_sets.h"
6
7 #include "base/logging.h"
8
9 namespace base {
10 namespace sequence_manager {
11 namespace internal {
12
WorkQueueSets(const char * name,Observer * observer,const SequenceManager::Settings & settings)13 WorkQueueSets::WorkQueueSets(const char* name,
14 Observer* observer,
15 const SequenceManager::Settings& settings)
16 : name_(name),
17 #if DCHECK_IS_ON()
18 last_rand_(settings.random_task_selection_seed),
19 #endif
20 observer_(observer) {
21 }
22
23 WorkQueueSets::~WorkQueueSets() = default;
24
AddQueue(WorkQueue * work_queue,size_t set_index)25 void WorkQueueSets::AddQueue(WorkQueue* work_queue, size_t set_index) {
26 DCHECK(!work_queue->work_queue_sets());
27 DCHECK_LT(set_index, work_queue_heaps_.size());
28 DCHECK(!work_queue->heap_handle().IsValid());
29 EnqueueOrder enqueue_order;
30 bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
31 work_queue->AssignToWorkQueueSets(this);
32 work_queue->AssignSetIndex(set_index);
33 if (!has_enqueue_order)
34 return;
35 bool was_empty = work_queue_heaps_[set_index].empty();
36 work_queue_heaps_[set_index].insert({enqueue_order, work_queue});
37 if (was_empty)
38 observer_->WorkQueueSetBecameNonEmpty(set_index);
39 }
40
RemoveQueue(WorkQueue * work_queue)41 void WorkQueueSets::RemoveQueue(WorkQueue* work_queue) {
42 DCHECK_EQ(this, work_queue->work_queue_sets());
43 work_queue->AssignToWorkQueueSets(nullptr);
44 if (!work_queue->heap_handle().IsValid())
45 return;
46 size_t set_index = work_queue->work_queue_set_index();
47 DCHECK_LT(set_index, work_queue_heaps_.size());
48 work_queue_heaps_[set_index].erase(work_queue->heap_handle());
49 if (work_queue_heaps_[set_index].empty())
50 observer_->WorkQueueSetBecameEmpty(set_index);
51 DCHECK(!work_queue->heap_handle().IsValid());
52 }
53
ChangeSetIndex(WorkQueue * work_queue,size_t set_index)54 void WorkQueueSets::ChangeSetIndex(WorkQueue* work_queue, size_t set_index) {
55 DCHECK_EQ(this, work_queue->work_queue_sets());
56 DCHECK_LT(set_index, work_queue_heaps_.size());
57 EnqueueOrder enqueue_order;
58 bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
59 size_t old_set = work_queue->work_queue_set_index();
60 DCHECK_LT(old_set, work_queue_heaps_.size());
61 DCHECK_NE(old_set, set_index);
62 work_queue->AssignSetIndex(set_index);
63 DCHECK_EQ(has_enqueue_order, work_queue->heap_handle().IsValid());
64 if (!has_enqueue_order)
65 return;
66 work_queue_heaps_[old_set].erase(work_queue->heap_handle());
67 bool was_empty = work_queue_heaps_[set_index].empty();
68 work_queue_heaps_[set_index].insert({enqueue_order, work_queue});
69 if (work_queue_heaps_[old_set].empty())
70 observer_->WorkQueueSetBecameEmpty(old_set);
71 if (was_empty)
72 observer_->WorkQueueSetBecameNonEmpty(set_index);
73 }
74
OnQueuesFrontTaskChanged(WorkQueue * work_queue)75 void WorkQueueSets::OnQueuesFrontTaskChanged(WorkQueue* work_queue) {
76 EnqueueOrder enqueue_order;
77 size_t set_index = work_queue->work_queue_set_index();
78 DCHECK_EQ(this, work_queue->work_queue_sets());
79 DCHECK_LT(set_index, work_queue_heaps_.size());
80 DCHECK(work_queue->heap_handle().IsValid());
81 DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
82 if (work_queue->GetFrontTaskEnqueueOrder(&enqueue_order)) {
83 // O(log n)
84 work_queue_heaps_[set_index].ChangeKey(work_queue->heap_handle(),
85 {enqueue_order, work_queue});
86 } else {
87 // O(log n)
88 work_queue_heaps_[set_index].erase(work_queue->heap_handle());
89 DCHECK(!work_queue->heap_handle().IsValid());
90 if (work_queue_heaps_[set_index].empty())
91 observer_->WorkQueueSetBecameEmpty(set_index);
92 }
93 }
94
OnTaskPushedToEmptyQueue(WorkQueue * work_queue)95 void WorkQueueSets::OnTaskPushedToEmptyQueue(WorkQueue* work_queue) {
96 // NOTE if this function changes, we need to keep |WorkQueueSets::AddQueue| in
97 // sync.
98 DCHECK_EQ(this, work_queue->work_queue_sets());
99 EnqueueOrder enqueue_order;
100 bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
101 DCHECK(has_enqueue_order);
102 size_t set_index = work_queue->work_queue_set_index();
103 DCHECK_LT(set_index, work_queue_heaps_.size())
104 << " set_index = " << set_index;
105 // |work_queue| should not be in work_queue_heaps_[set_index].
106 DCHECK(!work_queue->heap_handle().IsValid());
107 bool was_empty = work_queue_heaps_[set_index].empty();
108 work_queue_heaps_[set_index].insert({enqueue_order, work_queue});
109 if (was_empty)
110 observer_->WorkQueueSetBecameNonEmpty(set_index);
111 }
112
OnPopMinQueueInSet(WorkQueue * work_queue)113 void WorkQueueSets::OnPopMinQueueInSet(WorkQueue* work_queue) {
114 // Assume that |work_queue| contains the lowest enqueue_order.
115 size_t set_index = work_queue->work_queue_set_index();
116 DCHECK_EQ(this, work_queue->work_queue_sets());
117 DCHECK_LT(set_index, work_queue_heaps_.size());
118 DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
119 DCHECK_EQ(work_queue_heaps_[set_index].Min().value, work_queue)
120 << " set_index = " << set_index;
121 DCHECK(work_queue->heap_handle().IsValid());
122 EnqueueOrder enqueue_order;
123 if (work_queue->GetFrontTaskEnqueueOrder(&enqueue_order)) {
124 // O(log n)
125 work_queue_heaps_[set_index].ReplaceMin({enqueue_order, work_queue});
126 } else {
127 // O(log n)
128 work_queue_heaps_[set_index].Pop();
129 DCHECK(!work_queue->heap_handle().IsValid());
130 DCHECK(work_queue_heaps_[set_index].empty() ||
131 work_queue_heaps_[set_index].Min().value != work_queue);
132 if (work_queue_heaps_[set_index].empty()) {
133 observer_->WorkQueueSetBecameEmpty(set_index);
134 }
135 }
136 }
137
OnQueueBlocked(WorkQueue * work_queue)138 void WorkQueueSets::OnQueueBlocked(WorkQueue* work_queue) {
139 DCHECK_EQ(this, work_queue->work_queue_sets());
140 base::internal::HeapHandle heap_handle = work_queue->heap_handle();
141 if (!heap_handle.IsValid())
142 return;
143 size_t set_index = work_queue->work_queue_set_index();
144 DCHECK_LT(set_index, work_queue_heaps_.size());
145 work_queue_heaps_[set_index].erase(heap_handle);
146 if (work_queue_heaps_[set_index].empty())
147 observer_->WorkQueueSetBecameEmpty(set_index);
148 }
149
GetOldestQueueInSet(size_t set_index) const150 WorkQueue* WorkQueueSets::GetOldestQueueInSet(size_t set_index) const {
151 DCHECK_LT(set_index, work_queue_heaps_.size());
152 if (work_queue_heaps_[set_index].empty())
153 return nullptr;
154 WorkQueue* queue = work_queue_heaps_[set_index].Min().value;
155 DCHECK_EQ(set_index, queue->work_queue_set_index());
156 DCHECK(queue->heap_handle().IsValid());
157 return queue;
158 }
159
GetOldestQueueAndEnqueueOrderInSet(size_t set_index,EnqueueOrder * out_enqueue_order) const160 WorkQueue* WorkQueueSets::GetOldestQueueAndEnqueueOrderInSet(
161 size_t set_index,
162 EnqueueOrder* out_enqueue_order) const {
163 DCHECK_LT(set_index, work_queue_heaps_.size());
164 if (work_queue_heaps_[set_index].empty())
165 return nullptr;
166 const OldestTaskEnqueueOrder& oldest = work_queue_heaps_[set_index].Min();
167 DCHECK(oldest.value->heap_handle().IsValid());
168 *out_enqueue_order = oldest.key;
169 EnqueueOrder enqueue_order;
170 DCHECK(oldest.value->GetFrontTaskEnqueueOrder(&enqueue_order) &&
171 oldest.key == enqueue_order);
172 return oldest.value;
173 }
174
175 #if DCHECK_IS_ON()
GetRandomQueueInSet(size_t set_index) const176 WorkQueue* WorkQueueSets::GetRandomQueueInSet(size_t set_index) const {
177 DCHECK_LT(set_index, work_queue_heaps_.size());
178 if (work_queue_heaps_[set_index].empty())
179 return nullptr;
180
181 WorkQueue* queue =
182 work_queue_heaps_[set_index]
183 .begin()[Random() % work_queue_heaps_[set_index].size()]
184 .value;
185 DCHECK_EQ(set_index, queue->work_queue_set_index());
186 DCHECK(queue->heap_handle().IsValid());
187 return queue;
188 }
189
GetRandomQueueAndEnqueueOrderInSet(size_t set_index,EnqueueOrder * out_enqueue_order) const190 WorkQueue* WorkQueueSets::GetRandomQueueAndEnqueueOrderInSet(
191 size_t set_index,
192 EnqueueOrder* out_enqueue_order) const {
193 DCHECK_LT(set_index, work_queue_heaps_.size());
194 if (work_queue_heaps_[set_index].empty())
195 return nullptr;
196 const OldestTaskEnqueueOrder& chosen =
197 work_queue_heaps_[set_index]
198 .begin()[Random() % work_queue_heaps_[set_index].size()];
199 *out_enqueue_order = chosen.key;
200 EnqueueOrder enqueue_order;
201 DCHECK(chosen.value->GetFrontTaskEnqueueOrder(&enqueue_order) &&
202 chosen.key == enqueue_order);
203 return chosen.value;
204 }
205 #endif
206
IsSetEmpty(size_t set_index) const207 bool WorkQueueSets::IsSetEmpty(size_t set_index) const {
208 DCHECK_LT(set_index, work_queue_heaps_.size())
209 << " set_index = " << set_index;
210 return work_queue_heaps_[set_index].empty();
211 }
212
213 #if DCHECK_IS_ON() || !defined(NDEBUG)
ContainsWorkQueueForTest(const WorkQueue * work_queue) const214 bool WorkQueueSets::ContainsWorkQueueForTest(
215 const WorkQueue* work_queue) const {
216 EnqueueOrder enqueue_order;
217 bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
218
219 for (const base::internal::IntrusiveHeap<OldestTaskEnqueueOrder>& heap :
220 work_queue_heaps_) {
221 for (const OldestTaskEnqueueOrder& heap_value_pair : heap) {
222 if (heap_value_pair.value == work_queue) {
223 DCHECK(has_enqueue_order);
224 DCHECK_EQ(heap_value_pair.key, enqueue_order);
225 DCHECK_EQ(this, work_queue->work_queue_sets());
226 return true;
227 }
228 }
229 }
230
231 if (work_queue->work_queue_sets() == this) {
232 DCHECK(!has_enqueue_order);
233 return true;
234 }
235
236 return false;
237 }
238 #endif
239
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const240 void WorkQueueSets::CollectSkippedOverLowerPriorityTasks(
241 const internal::WorkQueue* selected_work_queue,
242 std::vector<const Task*>* result) const {
243 EnqueueOrder selected_enqueue_order;
244 CHECK(selected_work_queue->GetFrontTaskEnqueueOrder(&selected_enqueue_order));
245 for (size_t priority = selected_work_queue->work_queue_set_index() + 1;
246 priority < TaskQueue::kQueuePriorityCount; priority++) {
247 for (const OldestTaskEnqueueOrder& pair : work_queue_heaps_[priority]) {
248 pair.value->CollectTasksOlderThan(selected_enqueue_order, result);
249 }
250 }
251 }
252
253 } // namespace internal
254 } // namespace sequence_manager
255 } // namespace base
256