1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/executors/EDFThreadPoolExecutor.h>
18
19 #include <algorithm>
20 #include <array>
21 #include <atomic>
22 #include <chrono>
23 #include <cstddef>
24 #include <exception>
25 #include <limits>
26 #include <memory>
27 #include <queue>
28 #include <utility>
29 #include <vector>
30
31 #include <folly/ScopeGuard.h>
32
33 namespace folly {
34
35 class EDFThreadPoolExecutor::Task {
36 public:
Task(Func && f,int repeat,uint64_t deadline)37 explicit Task(Func&& f, int repeat, uint64_t deadline)
38 : f_(std::move(f)), total_(repeat), deadline_(deadline) {}
39
Task(std::vector<Func> && fs,uint64_t deadline)40 explicit Task(std::vector<Func>&& fs, uint64_t deadline)
41 : fs_(std::move(fs)), total_(fs_.size()), deadline_(deadline) {}
42
getDeadline() const43 uint64_t getDeadline() const { return deadline_; }
44
isDone() const45 bool isDone() const {
46 return iter_.load(std::memory_order_relaxed) >= total_;
47 }
48
next()49 int next() {
50 if (isDone()) {
51 return -1;
52 }
53
54 int result = iter_.fetch_add(1, std::memory_order_relaxed);
55 return result < total_ ? result : -1;
56 }
57
run(int i)58 void run(int i) {
59 folly::RequestContextScopeGuard guard(context_);
60 if (f_) {
61 f_();
62 if (i >= total_ - 1) {
63 std::exchange(f_, nullptr);
64 }
65 } else {
66 DCHECK(0 <= i && i < total_);
67 fs_[i]();
68 std::exchange(fs_[i], nullptr);
69 }
70 }
71
72 Func f_;
73 std::vector<Func> fs_;
74 std::atomic<int> iter_{0};
75 int total_;
76 uint64_t deadline_;
77 std::shared_ptr<RequestContext> context_ = RequestContext::saveContext();
78 std::chrono::steady_clock::time_point enqueueTime_ =
79 std::chrono::steady_clock::now();
80 };
81
82 class EDFThreadPoolExecutor::TaskQueue {
83 public:
84 using TaskPtr = std::shared_ptr<Task>;
85
86 // This is not a `Synchronized` because we perform a few "peek" operations.
87 struct Bucket {
88 SharedMutex mutex;
89
90 struct Compare {
operator ()folly::EDFThreadPoolExecutor::TaskQueue::Bucket::Compare91 bool operator()(const TaskPtr& lhs, const TaskPtr& rhs) const {
92 return lhs->getDeadline() > rhs->getDeadline();
93 }
94 };
95
96 std::priority_queue<TaskPtr, std::vector<TaskPtr>, Compare> tasks;
97 std::atomic<bool> empty{true};
98 };
99
100 static constexpr std::size_t kNumBuckets = 2 << 5;
101
TaskQueue()102 explicit TaskQueue()
103 : buckets_{}, curDeadline_(kLatestDeadline), numItems_(0) {}
104
push(TaskPtr task)105 void push(TaskPtr task) {
106 auto deadline = task->getDeadline();
107 auto& bucket = getBucket(deadline);
108 {
109 SharedMutex::WriteHolder guard(&bucket.mutex);
110 bucket.tasks.push(std::move(task));
111 bucket.empty.store(bucket.tasks.empty(), std::memory_order_relaxed);
112 }
113
114 numItems_.fetch_add(1, std::memory_order_seq_cst);
115
116 // Update current earliest deadline if necessary
117 uint64_t curDeadline = curDeadline_.load(std::memory_order_relaxed);
118 do {
119 if (curDeadline <= deadline) {
120 break;
121 }
122 } while (!curDeadline_.compare_exchange_weak(
123 curDeadline, deadline, std::memory_order_relaxed));
124 }
125
pop()126 TaskPtr pop() {
127 bool needDeadlineUpdate = false;
128 for (;;) {
129 if (numItems_.load(std::memory_order_seq_cst) == 0) {
130 return nullptr;
131 }
132
133 auto curDeadline = curDeadline_.load(std::memory_order_relaxed);
134 auto& bucket = getBucket(curDeadline);
135
136 if (needDeadlineUpdate || bucket.empty.load(std::memory_order_relaxed)) {
137 // Try setting the next earliest deadline. However no need to
138 // enforce as there might be insertion happening.
139 // If there is no next deadline, we set deadline to `kLatestDeadline`.
140 curDeadline_.compare_exchange_weak(
141 curDeadline,
142 findNextDeadline(curDeadline),
143 std::memory_order_relaxed);
144 needDeadlineUpdate = false;
145 continue;
146 }
147
148 {
149 // Fast path. Take bucket reader lock.
150 SharedMutex::ReadHolder guard(&bucket.mutex);
151 if (bucket.tasks.empty()) {
152 continue;
153 }
154 const auto& task = bucket.tasks.top();
155 if (!task->isDone() && task->getDeadline() == curDeadline) {
156 return task;
157 }
158 // If the task is finished already, fall through to remove it.
159 }
160
161 {
162 // Take the writer lock to clean up the finished task.
163 SharedMutex::WriteHolder guard(&bucket.mutex);
164 if (bucket.tasks.empty()) {
165 continue;
166 }
167 const auto& task = bucket.tasks.top();
168 if (task->isDone()) {
169 // Current task finished. Remove from the queue.
170 bucket.tasks.pop();
171 bucket.empty.store(bucket.tasks.empty(), std::memory_order_relaxed);
172 numItems_.fetch_sub(1, std::memory_order_seq_cst);
173 }
174 }
175
176 // We may have finished processing the current task / bucket. Going back
177 // to the beginning of the loop to find the next bucket.
178 needDeadlineUpdate = true;
179 }
180 }
181
size() const182 std::size_t size() const { return numItems_.load(std::memory_order_seq_cst); }
183
184 private:
getBucket(uint64_t deadline)185 Bucket& getBucket(uint64_t deadline) {
186 return buckets_[deadline % kNumBuckets];
187 }
188
findNextDeadline(uint64_t prevDeadline)189 uint64_t findNextDeadline(uint64_t prevDeadline) {
190 auto begin = prevDeadline % kNumBuckets;
191
192 uint64_t earliestDeadline = kLatestDeadline;
193 for (std::size_t i = 0; i < kNumBuckets; ++i) {
194 auto& bucket = buckets_[(begin + i) % kNumBuckets];
195
196 // Peek without locking first.
197 if (bucket.empty.load(std::memory_order_relaxed)) {
198 continue;
199 }
200
201 SharedMutex::ReadHolder guard(&bucket.mutex);
202 auto curDeadline = curDeadline_.load(std::memory_order_relaxed);
203 if (prevDeadline != curDeadline) {
204 // Bail out early if something already happened
205 return curDeadline;
206 }
207
208 // Verify again after locking
209 if (bucket.tasks.empty()) {
210 continue;
211 }
212
213 const auto& task = bucket.tasks.top();
214 auto deadline = task->getDeadline();
215
216 if (deadline < earliestDeadline) {
217 earliestDeadline = deadline;
218 }
219
220 if ((deadline <= prevDeadline) ||
221 (deadline - prevDeadline < kNumBuckets)) {
222 // Found the next highest priority, or new tasks were added.
223 // No need to scan anymore.
224 break;
225 }
226 }
227
228 return earliestDeadline;
229 }
230
231 std::array<Bucket, kNumBuckets> buckets_;
232 std::atomic<uint64_t> curDeadline_;
233
234 // All operations performed on `numItems_` explicitly specify memory
235 // ordering of `std::memory_order_seq_cst`. This is due to `numItems_`
236 // performing Dekker's algorithm with `numIdleThreads_` prior to consumer
237 // threads (workers) wait on `sem_`.
238 std::atomic<std::size_t> numItems_;
239 };
240
EDFThreadPoolExecutor(std::size_t numThreads,std::shared_ptr<ThreadFactory> threadFactory)241 EDFThreadPoolExecutor::EDFThreadPoolExecutor(
242 std::size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory)
243 : ThreadPoolExecutor(numThreads, numThreads, std::move(threadFactory)),
244 taskQueue_(std::make_unique<TaskQueue>()) {
245 setNumThreads(numThreads);
246 registerThreadPoolExecutor(this);
247 }
248
~EDFThreadPoolExecutor()249 EDFThreadPoolExecutor::~EDFThreadPoolExecutor() {
250 deregisterThreadPoolExecutor(this);
251 stop();
252 }
253
add(Func f)254 void EDFThreadPoolExecutor::add(Func f) {
255 add(std::move(f), kLatestDeadline);
256 }
257
add(Func f,uint64_t deadline)258 void EDFThreadPoolExecutor::add(Func f, uint64_t deadline) {
259 add(std::move(f), 1, deadline);
260 }
261
add(Func f,std::size_t total,uint64_t deadline)262 void EDFThreadPoolExecutor::add(Func f, std::size_t total, uint64_t deadline) {
263 if (UNLIKELY(isJoin_.load(std::memory_order_relaxed) || total == 0)) {
264 return;
265 }
266
267 taskQueue_->push(std::make_shared<Task>(std::move(f), total, deadline));
268
269 auto numIdleThreads = numIdleThreads_.load(std::memory_order_seq_cst);
270 if (numIdleThreads > 0) {
271 // If idle threads are available notify them, otherwise all worker threads
272 // are running and will get around to this task in time.
273 sem_.post(std::min(total, numIdleThreads));
274 }
275 }
276
add(std::vector<Func> fs,uint64_t deadline)277 void EDFThreadPoolExecutor::add(std::vector<Func> fs, uint64_t deadline) {
278 if (UNLIKELY(fs.empty())) {
279 return;
280 }
281
282 auto total = fs.size();
283 taskQueue_->push(std::make_shared<Task>(std::move(fs), deadline));
284
285 auto numIdleThreads = numIdleThreads_.load(std::memory_order_seq_cst);
286 if (numIdleThreads > 0) {
287 // If idle threads are available notify them, otherwise all worker threads
288 // are running and will get around to this task in time.
289 sem_.post(std::min(total, numIdleThreads));
290 }
291 }
292
deadlineExecutor(uint64_t deadline)293 folly::Executor::KeepAlive<> EDFThreadPoolExecutor::deadlineExecutor(
294 uint64_t deadline) {
295 class DeadlineExecutor : public folly::Executor {
296 public:
297 static KeepAlive<> create(
298 uint64_t deadline, KeepAlive<EDFThreadPoolExecutor> executor) {
299 return makeKeepAlive(new DeadlineExecutor(deadline, std::move(executor)));
300 }
301
302 void add(folly::Func f) override {
303 executor_->add(std::move(f), deadline_);
304 }
305
306 bool keepAliveAcquire() noexcept override {
307 const auto count =
308 keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
309 DCHECK_GT(count, 0);
310 return true;
311 }
312
313 void keepAliveRelease() noexcept override {
314 const auto count =
315 keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
316 DCHECK_GT(count, 0);
317 if (count == 1) {
318 delete this;
319 }
320 }
321
322 private:
323 DeadlineExecutor(
324 uint64_t deadline, KeepAlive<EDFThreadPoolExecutor> executor)
325 : deadline_(deadline), executor_(std::move(executor)) {}
326
327 std::atomic<size_t> keepAliveCount_{1};
328 uint64_t deadline_;
329 KeepAlive<EDFThreadPoolExecutor> executor_;
330 };
331 return DeadlineExecutor::create(deadline, getKeepAliveToken(this));
332 }
333
threadRun(ThreadPtr thread)334 void EDFThreadPoolExecutor::threadRun(ThreadPtr thread) {
335 this->threadPoolHook_.registerThread();
336 ExecutorBlockingGuard guard{
337 ExecutorBlockingGuard::TrackTag{}, this, namePrefix_};
338
339 thread->startupBaton.post();
340 for (;;) {
341 auto task = take();
342
343 // Handle thread stopping
344 if (UNLIKELY(!task)) {
345 // Actually remove the thread from the list.
346 SharedMutex::WriteHolder w{&threadListLock_};
347 for (auto& o : observers_) {
348 o->threadStopped(thread.get());
349 }
350 threadList_.remove(thread);
351 stoppedThreads_.add(thread);
352 return;
353 }
354
355 int iter = task->next();
356 if (UNLIKELY(iter < 0)) {
357 // This task is already finished
358 continue;
359 }
360
361 thread->idle.store(false, std::memory_order_relaxed);
362 auto startTime = std::chrono::steady_clock::now();
363 TaskStats stats;
364 stats.enqueueTime = task->enqueueTime_;
365 if (task->context_) {
366 stats.requestId = task->context_->getRootId();
367 }
368
369 stats.waitTime = startTime - stats.enqueueTime;
370 invokeCatchingExns("EDFThreadPoolExecutor: func", [&] {
371 std::exchange(task, {})->run(iter);
372 });
373 stats.runTime = std::chrono::steady_clock::now() - startTime;
374 thread->idle.store(true, std::memory_order_relaxed);
375 thread->lastActiveTime.store(
376 std::chrono::steady_clock::now(), std::memory_order_relaxed);
377 auto& inCallback = *thread->taskStatsCallbacks->inCallback;
378 thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
379 inCallback = true;
380 SCOPE_EXIT { inCallback = false; };
381 invokeCatchingExns("EDFThreadPoolExecutor: stats callback", [&] {
382 for (auto& callback : callbacks) {
383 callback(stats);
384 }
385 });
386 });
387 }
388 }
389
390 // threadListLock_ is writelocked.
stopThreads(std::size_t numThreads)391 void EDFThreadPoolExecutor::stopThreads(std::size_t numThreads) {
392 threadsToStop_.fetch_add(numThreads, std::memory_order_relaxed);
393 sem_.post(numThreads);
394 }
395
396 // threadListLock_ is read (or write) locked.
getPendingTaskCountImpl() const397 std::size_t EDFThreadPoolExecutor::getPendingTaskCountImpl() const {
398 return taskQueue_->size();
399 }
400
shouldStop()401 bool EDFThreadPoolExecutor::shouldStop() {
402 // in normal cases, only do a read (prevents cache line bounces)
403 if (threadsToStop_.load(std::memory_order_relaxed) <= 0 ||
404 isJoin_.load(std::memory_order_relaxed)) {
405 return false;
406 }
407 // modify only if needed
408 if (threadsToStop_.fetch_sub(1, std::memory_order_relaxed) > 0) {
409 return true;
410 } else {
411 threadsToStop_.fetch_add(1, std::memory_order_relaxed);
412 return false;
413 }
414 }
415
take()416 std::shared_ptr<EDFThreadPoolExecutor::Task> EDFThreadPoolExecutor::take() {
417 if (UNLIKELY(shouldStop())) {
418 return nullptr;
419 }
420
421 if (auto task = taskQueue_->pop()) {
422 return task;
423 }
424
425 if (UNLIKELY(isJoin_.load(std::memory_order_relaxed))) {
426 return nullptr;
427 }
428
429 // No tasks on the horizon, so go sleep
430 numIdleThreads_.fetch_add(1, std::memory_order_seq_cst);
431
432 SCOPE_EXIT { numIdleThreads_.fetch_sub(1, std::memory_order_seq_cst); };
433
434 for (;;) {
435 if (UNLIKELY(shouldStop())) {
436 return nullptr;
437 }
438
439 if (auto task = taskQueue_->pop()) {
440 // It's possible to return a finished task here, in which case
441 // the worker will call this function again.
442 return task;
443 }
444
445 if (UNLIKELY(isJoin_.load(std::memory_order_relaxed))) {
446 return nullptr;
447 }
448
449 sem_.wait();
450 }
451 }
452
453 } // namespace folly
454