1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/executors/EDFThreadPoolExecutor.h>
18 
19 #include <algorithm>
20 #include <array>
21 #include <atomic>
22 #include <chrono>
23 #include <cstddef>
24 #include <exception>
25 #include <limits>
26 #include <memory>
27 #include <queue>
28 #include <utility>
29 #include <vector>
30 
31 #include <folly/ScopeGuard.h>
32 
33 namespace folly {
34 
35 class EDFThreadPoolExecutor::Task {
36  public:
Task(Func && f,int repeat,uint64_t deadline)37   explicit Task(Func&& f, int repeat, uint64_t deadline)
38       : f_(std::move(f)), total_(repeat), deadline_(deadline) {}
39 
Task(std::vector<Func> && fs,uint64_t deadline)40   explicit Task(std::vector<Func>&& fs, uint64_t deadline)
41       : fs_(std::move(fs)), total_(fs_.size()), deadline_(deadline) {}
42 
getDeadline() const43   uint64_t getDeadline() const { return deadline_; }
44 
isDone() const45   bool isDone() const {
46     return iter_.load(std::memory_order_relaxed) >= total_;
47   }
48 
next()49   int next() {
50     if (isDone()) {
51       return -1;
52     }
53 
54     int result = iter_.fetch_add(1, std::memory_order_relaxed);
55     return result < total_ ? result : -1;
56   }
57 
run(int i)58   void run(int i) {
59     folly::RequestContextScopeGuard guard(context_);
60     if (f_) {
61       f_();
62       if (i >= total_ - 1) {
63         std::exchange(f_, nullptr);
64       }
65     } else {
66       DCHECK(0 <= i && i < total_);
67       fs_[i]();
68       std::exchange(fs_[i], nullptr);
69     }
70   }
71 
72   Func f_;
73   std::vector<Func> fs_;
74   std::atomic<int> iter_{0};
75   int total_;
76   uint64_t deadline_;
77   std::shared_ptr<RequestContext> context_ = RequestContext::saveContext();
78   std::chrono::steady_clock::time_point enqueueTime_ =
79       std::chrono::steady_clock::now();
80 };
81 
82 class EDFThreadPoolExecutor::TaskQueue {
83  public:
84   using TaskPtr = std::shared_ptr<Task>;
85 
86   // This is not a `Synchronized` because we perform a few "peek" operations.
87   struct Bucket {
88     SharedMutex mutex;
89 
90     struct Compare {
operator ()folly::EDFThreadPoolExecutor::TaskQueue::Bucket::Compare91       bool operator()(const TaskPtr& lhs, const TaskPtr& rhs) const {
92         return lhs->getDeadline() > rhs->getDeadline();
93       }
94     };
95 
96     std::priority_queue<TaskPtr, std::vector<TaskPtr>, Compare> tasks;
97     std::atomic<bool> empty{true};
98   };
99 
100   static constexpr std::size_t kNumBuckets = 2 << 5;
101 
TaskQueue()102   explicit TaskQueue()
103       : buckets_{}, curDeadline_(kLatestDeadline), numItems_(0) {}
104 
push(TaskPtr task)105   void push(TaskPtr task) {
106     auto deadline = task->getDeadline();
107     auto& bucket = getBucket(deadline);
108     {
109       SharedMutex::WriteHolder guard(&bucket.mutex);
110       bucket.tasks.push(std::move(task));
111       bucket.empty.store(bucket.tasks.empty(), std::memory_order_relaxed);
112     }
113 
114     numItems_.fetch_add(1, std::memory_order_seq_cst);
115 
116     // Update current earliest deadline if necessary
117     uint64_t curDeadline = curDeadline_.load(std::memory_order_relaxed);
118     do {
119       if (curDeadline <= deadline) {
120         break;
121       }
122     } while (!curDeadline_.compare_exchange_weak(
123         curDeadline, deadline, std::memory_order_relaxed));
124   }
125 
pop()126   TaskPtr pop() {
127     bool needDeadlineUpdate = false;
128     for (;;) {
129       if (numItems_.load(std::memory_order_seq_cst) == 0) {
130         return nullptr;
131       }
132 
133       auto curDeadline = curDeadline_.load(std::memory_order_relaxed);
134       auto& bucket = getBucket(curDeadline);
135 
136       if (needDeadlineUpdate || bucket.empty.load(std::memory_order_relaxed)) {
137         // Try setting the next earliest deadline. However no need to
138         // enforce as there might be insertion happening.
139         // If there is no next deadline, we set deadline to `kLatestDeadline`.
140         curDeadline_.compare_exchange_weak(
141             curDeadline,
142             findNextDeadline(curDeadline),
143             std::memory_order_relaxed);
144         needDeadlineUpdate = false;
145         continue;
146       }
147 
148       {
149         // Fast path. Take bucket reader lock.
150         SharedMutex::ReadHolder guard(&bucket.mutex);
151         if (bucket.tasks.empty()) {
152           continue;
153         }
154         const auto& task = bucket.tasks.top();
155         if (!task->isDone() && task->getDeadline() == curDeadline) {
156           return task;
157         }
158         // If the task is finished already, fall through to remove it.
159       }
160 
161       {
162         // Take the writer lock to clean up the finished task.
163         SharedMutex::WriteHolder guard(&bucket.mutex);
164         if (bucket.tasks.empty()) {
165           continue;
166         }
167         const auto& task = bucket.tasks.top();
168         if (task->isDone()) {
169           // Current task finished. Remove from the queue.
170           bucket.tasks.pop();
171           bucket.empty.store(bucket.tasks.empty(), std::memory_order_relaxed);
172           numItems_.fetch_sub(1, std::memory_order_seq_cst);
173         }
174       }
175 
176       // We may have finished processing the current task / bucket. Going back
177       // to the beginning of the loop to find the next bucket.
178       needDeadlineUpdate = true;
179     }
180   }
181 
size() const182   std::size_t size() const { return numItems_.load(std::memory_order_seq_cst); }
183 
184  private:
getBucket(uint64_t deadline)185   Bucket& getBucket(uint64_t deadline) {
186     return buckets_[deadline % kNumBuckets];
187   }
188 
findNextDeadline(uint64_t prevDeadline)189   uint64_t findNextDeadline(uint64_t prevDeadline) {
190     auto begin = prevDeadline % kNumBuckets;
191 
192     uint64_t earliestDeadline = kLatestDeadline;
193     for (std::size_t i = 0; i < kNumBuckets; ++i) {
194       auto& bucket = buckets_[(begin + i) % kNumBuckets];
195 
196       // Peek without locking first.
197       if (bucket.empty.load(std::memory_order_relaxed)) {
198         continue;
199       }
200 
201       SharedMutex::ReadHolder guard(&bucket.mutex);
202       auto curDeadline = curDeadline_.load(std::memory_order_relaxed);
203       if (prevDeadline != curDeadline) {
204         // Bail out early if something already happened
205         return curDeadline;
206       }
207 
208       // Verify again after locking
209       if (bucket.tasks.empty()) {
210         continue;
211       }
212 
213       const auto& task = bucket.tasks.top();
214       auto deadline = task->getDeadline();
215 
216       if (deadline < earliestDeadline) {
217         earliestDeadline = deadline;
218       }
219 
220       if ((deadline <= prevDeadline) ||
221           (deadline - prevDeadline < kNumBuckets)) {
222         // Found the next highest priority, or new tasks were added.
223         // No need to scan anymore.
224         break;
225       }
226     }
227 
228     return earliestDeadline;
229   }
230 
231   std::array<Bucket, kNumBuckets> buckets_;
232   std::atomic<uint64_t> curDeadline_;
233 
234   // All operations performed on `numItems_` explicitly specify memory
235   // ordering of `std::memory_order_seq_cst`. This is due to `numItems_`
236   // performing Dekker's algorithm with `numIdleThreads_` prior to consumer
237   // threads (workers) wait on `sem_`.
238   std::atomic<std::size_t> numItems_;
239 };
240 
EDFThreadPoolExecutor(std::size_t numThreads,std::shared_ptr<ThreadFactory> threadFactory)241 EDFThreadPoolExecutor::EDFThreadPoolExecutor(
242     std::size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory)
243     : ThreadPoolExecutor(numThreads, numThreads, std::move(threadFactory)),
244       taskQueue_(std::make_unique<TaskQueue>()) {
245   setNumThreads(numThreads);
246   registerThreadPoolExecutor(this);
247 }
248 
~EDFThreadPoolExecutor()249 EDFThreadPoolExecutor::~EDFThreadPoolExecutor() {
250   deregisterThreadPoolExecutor(this);
251   stop();
252 }
253 
add(Func f)254 void EDFThreadPoolExecutor::add(Func f) {
255   add(std::move(f), kLatestDeadline);
256 }
257 
add(Func f,uint64_t deadline)258 void EDFThreadPoolExecutor::add(Func f, uint64_t deadline) {
259   add(std::move(f), 1, deadline);
260 }
261 
add(Func f,std::size_t total,uint64_t deadline)262 void EDFThreadPoolExecutor::add(Func f, std::size_t total, uint64_t deadline) {
263   if (UNLIKELY(isJoin_.load(std::memory_order_relaxed) || total == 0)) {
264     return;
265   }
266 
267   taskQueue_->push(std::make_shared<Task>(std::move(f), total, deadline));
268 
269   auto numIdleThreads = numIdleThreads_.load(std::memory_order_seq_cst);
270   if (numIdleThreads > 0) {
271     // If idle threads are available notify them, otherwise all worker threads
272     // are running and will get around to this task in time.
273     sem_.post(std::min(total, numIdleThreads));
274   }
275 }
276 
add(std::vector<Func> fs,uint64_t deadline)277 void EDFThreadPoolExecutor::add(std::vector<Func> fs, uint64_t deadline) {
278   if (UNLIKELY(fs.empty())) {
279     return;
280   }
281 
282   auto total = fs.size();
283   taskQueue_->push(std::make_shared<Task>(std::move(fs), deadline));
284 
285   auto numIdleThreads = numIdleThreads_.load(std::memory_order_seq_cst);
286   if (numIdleThreads > 0) {
287     // If idle threads are available notify them, otherwise all worker threads
288     // are running and will get around to this task in time.
289     sem_.post(std::min(total, numIdleThreads));
290   }
291 }
292 
deadlineExecutor(uint64_t deadline)293 folly::Executor::KeepAlive<> EDFThreadPoolExecutor::deadlineExecutor(
294     uint64_t deadline) {
295   class DeadlineExecutor : public folly::Executor {
296    public:
297     static KeepAlive<> create(
298         uint64_t deadline, KeepAlive<EDFThreadPoolExecutor> executor) {
299       return makeKeepAlive(new DeadlineExecutor(deadline, std::move(executor)));
300     }
301 
302     void add(folly::Func f) override {
303       executor_->add(std::move(f), deadline_);
304     }
305 
306     bool keepAliveAcquire() noexcept override {
307       const auto count =
308           keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
309       DCHECK_GT(count, 0);
310       return true;
311     }
312 
313     void keepAliveRelease() noexcept override {
314       const auto count =
315           keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
316       DCHECK_GT(count, 0);
317       if (count == 1) {
318         delete this;
319       }
320     }
321 
322    private:
323     DeadlineExecutor(
324         uint64_t deadline, KeepAlive<EDFThreadPoolExecutor> executor)
325         : deadline_(deadline), executor_(std::move(executor)) {}
326 
327     std::atomic<size_t> keepAliveCount_{1};
328     uint64_t deadline_;
329     KeepAlive<EDFThreadPoolExecutor> executor_;
330   };
331   return DeadlineExecutor::create(deadline, getKeepAliveToken(this));
332 }
333 
threadRun(ThreadPtr thread)334 void EDFThreadPoolExecutor::threadRun(ThreadPtr thread) {
335   this->threadPoolHook_.registerThread();
336   ExecutorBlockingGuard guard{
337       ExecutorBlockingGuard::TrackTag{}, this, namePrefix_};
338 
339   thread->startupBaton.post();
340   for (;;) {
341     auto task = take();
342 
343     // Handle thread stopping
344     if (UNLIKELY(!task)) {
345       // Actually remove the thread from the list.
346       SharedMutex::WriteHolder w{&threadListLock_};
347       for (auto& o : observers_) {
348         o->threadStopped(thread.get());
349       }
350       threadList_.remove(thread);
351       stoppedThreads_.add(thread);
352       return;
353     }
354 
355     int iter = task->next();
356     if (UNLIKELY(iter < 0)) {
357       // This task is already finished
358       continue;
359     }
360 
361     thread->idle.store(false, std::memory_order_relaxed);
362     auto startTime = std::chrono::steady_clock::now();
363     TaskStats stats;
364     stats.enqueueTime = task->enqueueTime_;
365     if (task->context_) {
366       stats.requestId = task->context_->getRootId();
367     }
368 
369     stats.waitTime = startTime - stats.enqueueTime;
370     invokeCatchingExns("EDFThreadPoolExecutor: func", [&] {
371       std::exchange(task, {})->run(iter);
372     });
373     stats.runTime = std::chrono::steady_clock::now() - startTime;
374     thread->idle.store(true, std::memory_order_relaxed);
375     thread->lastActiveTime.store(
376         std::chrono::steady_clock::now(), std::memory_order_relaxed);
377     auto& inCallback = *thread->taskStatsCallbacks->inCallback;
378     thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
379       inCallback = true;
380       SCOPE_EXIT { inCallback = false; };
381       invokeCatchingExns("EDFThreadPoolExecutor: stats callback", [&] {
382         for (auto& callback : callbacks) {
383           callback(stats);
384         }
385       });
386     });
387   }
388 }
389 
390 // threadListLock_ is writelocked.
stopThreads(std::size_t numThreads)391 void EDFThreadPoolExecutor::stopThreads(std::size_t numThreads) {
392   threadsToStop_.fetch_add(numThreads, std::memory_order_relaxed);
393   sem_.post(numThreads);
394 }
395 
396 // threadListLock_ is read (or write) locked.
getPendingTaskCountImpl() const397 std::size_t EDFThreadPoolExecutor::getPendingTaskCountImpl() const {
398   return taskQueue_->size();
399 }
400 
shouldStop()401 bool EDFThreadPoolExecutor::shouldStop() {
402   // in normal cases, only do a read (prevents cache line bounces)
403   if (threadsToStop_.load(std::memory_order_relaxed) <= 0 ||
404       isJoin_.load(std::memory_order_relaxed)) {
405     return false;
406   }
407   // modify only if needed
408   if (threadsToStop_.fetch_sub(1, std::memory_order_relaxed) > 0) {
409     return true;
410   } else {
411     threadsToStop_.fetch_add(1, std::memory_order_relaxed);
412     return false;
413   }
414 }
415 
take()416 std::shared_ptr<EDFThreadPoolExecutor::Task> EDFThreadPoolExecutor::take() {
417   if (UNLIKELY(shouldStop())) {
418     return nullptr;
419   }
420 
421   if (auto task = taskQueue_->pop()) {
422     return task;
423   }
424 
425   if (UNLIKELY(isJoin_.load(std::memory_order_relaxed))) {
426     return nullptr;
427   }
428 
429   // No tasks on the horizon, so go sleep
430   numIdleThreads_.fetch_add(1, std::memory_order_seq_cst);
431 
432   SCOPE_EXIT { numIdleThreads_.fetch_sub(1, std::memory_order_seq_cst); };
433 
434   for (;;) {
435     if (UNLIKELY(shouldStop())) {
436       return nullptr;
437     }
438 
439     if (auto task = taskQueue_->pop()) {
440       // It's possible to return a finished task here, in which case
441       // the worker will call this function again.
442       return task;
443     }
444 
445     if (UNLIKELY(isJoin_.load(std::memory_order_relaxed))) {
446       return nullptr;
447     }
448 
449     sem_.wait();
450   }
451 }
452 
453 } // namespace folly
454