1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/work_queue_sets.h"
6 
7 #include "base/logging.h"
8 
9 namespace base {
10 namespace sequence_manager {
11 namespace internal {
12 
WorkQueueSets(const char * name,Observer * observer,const SequenceManager::Settings & settings)13 WorkQueueSets::WorkQueueSets(const char* name,
14                              Observer* observer,
15                              const SequenceManager::Settings& settings)
16     : name_(name),
17 #if DCHECK_IS_ON()
18       last_rand_(settings.random_task_selection_seed),
19 #endif
20       observer_(observer) {
21 }
22 
23 WorkQueueSets::~WorkQueueSets() = default;
24 
AddQueue(WorkQueue * work_queue,size_t set_index)25 void WorkQueueSets::AddQueue(WorkQueue* work_queue, size_t set_index) {
26   DCHECK(!work_queue->work_queue_sets());
27   DCHECK_LT(set_index, work_queue_heaps_.size());
28   DCHECK(!work_queue->heap_handle().IsValid());
29   EnqueueOrder enqueue_order;
30   bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
31   work_queue->AssignToWorkQueueSets(this);
32   work_queue->AssignSetIndex(set_index);
33   if (!has_enqueue_order)
34     return;
35   bool was_empty = work_queue_heaps_[set_index].empty();
36   work_queue_heaps_[set_index].insert({enqueue_order, work_queue});
37   if (was_empty)
38     observer_->WorkQueueSetBecameNonEmpty(set_index);
39 }
40 
RemoveQueue(WorkQueue * work_queue)41 void WorkQueueSets::RemoveQueue(WorkQueue* work_queue) {
42   DCHECK_EQ(this, work_queue->work_queue_sets());
43   work_queue->AssignToWorkQueueSets(nullptr);
44   if (!work_queue->heap_handle().IsValid())
45     return;
46   size_t set_index = work_queue->work_queue_set_index();
47   DCHECK_LT(set_index, work_queue_heaps_.size());
48   work_queue_heaps_[set_index].erase(work_queue->heap_handle());
49   if (work_queue_heaps_[set_index].empty())
50     observer_->WorkQueueSetBecameEmpty(set_index);
51   DCHECK(!work_queue->heap_handle().IsValid());
52 }
53 
ChangeSetIndex(WorkQueue * work_queue,size_t set_index)54 void WorkQueueSets::ChangeSetIndex(WorkQueue* work_queue, size_t set_index) {
55   DCHECK_EQ(this, work_queue->work_queue_sets());
56   DCHECK_LT(set_index, work_queue_heaps_.size());
57   EnqueueOrder enqueue_order;
58   bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
59   size_t old_set = work_queue->work_queue_set_index();
60   DCHECK_LT(old_set, work_queue_heaps_.size());
61   DCHECK_NE(old_set, set_index);
62   work_queue->AssignSetIndex(set_index);
63   DCHECK_EQ(has_enqueue_order, work_queue->heap_handle().IsValid());
64   if (!has_enqueue_order)
65     return;
66   work_queue_heaps_[old_set].erase(work_queue->heap_handle());
67   bool was_empty = work_queue_heaps_[set_index].empty();
68   work_queue_heaps_[set_index].insert({enqueue_order, work_queue});
69   if (work_queue_heaps_[old_set].empty())
70     observer_->WorkQueueSetBecameEmpty(old_set);
71   if (was_empty)
72     observer_->WorkQueueSetBecameNonEmpty(set_index);
73 }
74 
OnQueuesFrontTaskChanged(WorkQueue * work_queue)75 void WorkQueueSets::OnQueuesFrontTaskChanged(WorkQueue* work_queue) {
76   EnqueueOrder enqueue_order;
77   size_t set_index = work_queue->work_queue_set_index();
78   DCHECK_EQ(this, work_queue->work_queue_sets());
79   DCHECK_LT(set_index, work_queue_heaps_.size());
80   DCHECK(work_queue->heap_handle().IsValid());
81   DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
82   if (work_queue->GetFrontTaskEnqueueOrder(&enqueue_order)) {
83     // O(log n)
84     work_queue_heaps_[set_index].ChangeKey(work_queue->heap_handle(),
85                                            {enqueue_order, work_queue});
86   } else {
87     // O(log n)
88     work_queue_heaps_[set_index].erase(work_queue->heap_handle());
89     DCHECK(!work_queue->heap_handle().IsValid());
90     if (work_queue_heaps_[set_index].empty())
91       observer_->WorkQueueSetBecameEmpty(set_index);
92   }
93 }
94 
OnTaskPushedToEmptyQueue(WorkQueue * work_queue)95 void WorkQueueSets::OnTaskPushedToEmptyQueue(WorkQueue* work_queue) {
96   // NOTE if this function changes, we need to keep |WorkQueueSets::AddQueue| in
97   // sync.
98   DCHECK_EQ(this, work_queue->work_queue_sets());
99   EnqueueOrder enqueue_order;
100   bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
101   DCHECK(has_enqueue_order);
102   size_t set_index = work_queue->work_queue_set_index();
103   DCHECK_LT(set_index, work_queue_heaps_.size())
104       << " set_index = " << set_index;
105   // |work_queue| should not be in work_queue_heaps_[set_index].
106   DCHECK(!work_queue->heap_handle().IsValid());
107   bool was_empty = work_queue_heaps_[set_index].empty();
108   work_queue_heaps_[set_index].insert({enqueue_order, work_queue});
109   if (was_empty)
110     observer_->WorkQueueSetBecameNonEmpty(set_index);
111 }
112 
OnPopMinQueueInSet(WorkQueue * work_queue)113 void WorkQueueSets::OnPopMinQueueInSet(WorkQueue* work_queue) {
114   // Assume that |work_queue| contains the lowest enqueue_order.
115   size_t set_index = work_queue->work_queue_set_index();
116   DCHECK_EQ(this, work_queue->work_queue_sets());
117   DCHECK_LT(set_index, work_queue_heaps_.size());
118   DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
119   DCHECK_EQ(work_queue_heaps_[set_index].Min().value, work_queue)
120       << " set_index = " << set_index;
121   DCHECK(work_queue->heap_handle().IsValid());
122   EnqueueOrder enqueue_order;
123   if (work_queue->GetFrontTaskEnqueueOrder(&enqueue_order)) {
124     // O(log n)
125     work_queue_heaps_[set_index].ReplaceMin({enqueue_order, work_queue});
126   } else {
127     // O(log n)
128     work_queue_heaps_[set_index].Pop();
129     DCHECK(!work_queue->heap_handle().IsValid());
130     DCHECK(work_queue_heaps_[set_index].empty() ||
131            work_queue_heaps_[set_index].Min().value != work_queue);
132     if (work_queue_heaps_[set_index].empty()) {
133       observer_->WorkQueueSetBecameEmpty(set_index);
134     }
135   }
136 }
137 
OnQueueBlocked(WorkQueue * work_queue)138 void WorkQueueSets::OnQueueBlocked(WorkQueue* work_queue) {
139   DCHECK_EQ(this, work_queue->work_queue_sets());
140   base::internal::HeapHandle heap_handle = work_queue->heap_handle();
141   if (!heap_handle.IsValid())
142     return;
143   size_t set_index = work_queue->work_queue_set_index();
144   DCHECK_LT(set_index, work_queue_heaps_.size());
145   work_queue_heaps_[set_index].erase(heap_handle);
146   if (work_queue_heaps_[set_index].empty())
147     observer_->WorkQueueSetBecameEmpty(set_index);
148 }
149 
GetOldestQueueInSet(size_t set_index) const150 WorkQueue* WorkQueueSets::GetOldestQueueInSet(size_t set_index) const {
151   DCHECK_LT(set_index, work_queue_heaps_.size());
152   if (work_queue_heaps_[set_index].empty())
153     return nullptr;
154   WorkQueue* queue = work_queue_heaps_[set_index].Min().value;
155   DCHECK_EQ(set_index, queue->work_queue_set_index());
156   DCHECK(queue->heap_handle().IsValid());
157   return queue;
158 }
159 
GetOldestQueueAndEnqueueOrderInSet(size_t set_index,EnqueueOrder * out_enqueue_order) const160 WorkQueue* WorkQueueSets::GetOldestQueueAndEnqueueOrderInSet(
161     size_t set_index,
162     EnqueueOrder* out_enqueue_order) const {
163   DCHECK_LT(set_index, work_queue_heaps_.size());
164   if (work_queue_heaps_[set_index].empty())
165     return nullptr;
166   const OldestTaskEnqueueOrder& oldest = work_queue_heaps_[set_index].Min();
167   DCHECK(oldest.value->heap_handle().IsValid());
168   *out_enqueue_order = oldest.key;
169   EnqueueOrder enqueue_order;
170   DCHECK(oldest.value->GetFrontTaskEnqueueOrder(&enqueue_order) &&
171          oldest.key == enqueue_order);
172   return oldest.value;
173 }
174 
175 #if DCHECK_IS_ON()
GetRandomQueueInSet(size_t set_index) const176 WorkQueue* WorkQueueSets::GetRandomQueueInSet(size_t set_index) const {
177   DCHECK_LT(set_index, work_queue_heaps_.size());
178   if (work_queue_heaps_[set_index].empty())
179     return nullptr;
180 
181   WorkQueue* queue =
182       work_queue_heaps_[set_index]
183           .begin()[Random() % work_queue_heaps_[set_index].size()]
184           .value;
185   DCHECK_EQ(set_index, queue->work_queue_set_index());
186   DCHECK(queue->heap_handle().IsValid());
187   return queue;
188 }
189 
GetRandomQueueAndEnqueueOrderInSet(size_t set_index,EnqueueOrder * out_enqueue_order) const190 WorkQueue* WorkQueueSets::GetRandomQueueAndEnqueueOrderInSet(
191     size_t set_index,
192     EnqueueOrder* out_enqueue_order) const {
193   DCHECK_LT(set_index, work_queue_heaps_.size());
194   if (work_queue_heaps_[set_index].empty())
195     return nullptr;
196   const OldestTaskEnqueueOrder& chosen =
197       work_queue_heaps_[set_index]
198           .begin()[Random() % work_queue_heaps_[set_index].size()];
199   *out_enqueue_order = chosen.key;
200   EnqueueOrder enqueue_order;
201   DCHECK(chosen.value->GetFrontTaskEnqueueOrder(&enqueue_order) &&
202          chosen.key == enqueue_order);
203   return chosen.value;
204 }
205 #endif
206 
IsSetEmpty(size_t set_index) const207 bool WorkQueueSets::IsSetEmpty(size_t set_index) const {
208   DCHECK_LT(set_index, work_queue_heaps_.size())
209       << " set_index = " << set_index;
210   return work_queue_heaps_[set_index].empty();
211 }
212 
213 #if DCHECK_IS_ON() || !defined(NDEBUG)
ContainsWorkQueueForTest(const WorkQueue * work_queue) const214 bool WorkQueueSets::ContainsWorkQueueForTest(
215     const WorkQueue* work_queue) const {
216   EnqueueOrder enqueue_order;
217   bool has_enqueue_order = work_queue->GetFrontTaskEnqueueOrder(&enqueue_order);
218 
219   for (const base::internal::IntrusiveHeap<OldestTaskEnqueueOrder>& heap :
220        work_queue_heaps_) {
221     for (const OldestTaskEnqueueOrder& heap_value_pair : heap) {
222       if (heap_value_pair.value == work_queue) {
223         DCHECK(has_enqueue_order);
224         DCHECK_EQ(heap_value_pair.key, enqueue_order);
225         DCHECK_EQ(this, work_queue->work_queue_sets());
226         return true;
227       }
228     }
229   }
230 
231   if (work_queue->work_queue_sets() == this) {
232     DCHECK(!has_enqueue_order);
233     return true;
234   }
235 
236   return false;
237 }
238 #endif
239 
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const240 void WorkQueueSets::CollectSkippedOverLowerPriorityTasks(
241     const internal::WorkQueue* selected_work_queue,
242     std::vector<const Task*>* result) const {
243   EnqueueOrder selected_enqueue_order;
244   CHECK(selected_work_queue->GetFrontTaskEnqueueOrder(&selected_enqueue_order));
245   for (size_t priority = selected_work_queue->work_queue_set_index() + 1;
246        priority < TaskQueue::kQueuePriorityCount; priority++) {
247     for (const OldestTaskEnqueueOrder& pair : work_queue_heaps_[priority]) {
248       pair.value->CollectTasksOlderThan(selected_enqueue_order, result);
249     }
250   }
251 }
252 
253 }  // namespace internal
254 }  // namespace sequence_manager
255 }  // namespace base
256