1 //===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 // 9 // This file defines a crude C++11 based task queue. 10 // 11 //===----------------------------------------------------------------------===// 12 13 #ifndef LLVM_SUPPORT_TASKQUEUE_H 14 #define LLVM_SUPPORT_TASKQUEUE_H 15 16 #include "llvm/Config/llvm-config.h" 17 #include "llvm/Support/ThreadPool.h" 18 #include "llvm/Support/thread.h" 19 20 #include <atomic> 21 #include <cassert> 22 #include <condition_variable> 23 #include <deque> 24 #include <functional> 25 #include <future> 26 #include <memory> 27 #include <mutex> 28 #include <utility> 29 30 namespace llvm { 31 /// TaskQueue executes serialized work on a user-defined Thread Pool. It 32 /// guarantees that if task B is enqueued after task A, task B begins after 33 /// task A completes and there is no overlap between the two. 34 class TaskQueue { 35 // Because we don't have init capture to use move-only local variables that 36 // are captured into a lambda, we create the promise inside an explicit 37 // callable struct. We want to do as much of the wrapping in the 38 // type-specialized domain (before type erasure) and then erase this into a 39 // std::function. 40 template <typename Callable> struct Task { 41 using ResultTy = std::result_of_t<Callable()>; TaskTask42 explicit Task(Callable C, TaskQueue &Parent) 43 : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()), 44 Parent(&Parent) {} 45 46 template<typename T> invokeCallbackAndSetPromiseTask47 void invokeCallbackAndSetPromise(T*) { 48 P->set_value(C()); 49 } 50 invokeCallbackAndSetPromiseTask51 void invokeCallbackAndSetPromise(void*) { 52 C(); 53 P->set_value(); 54 } 55 operatorTask56 void operator()() noexcept { 57 ResultTy *Dummy = nullptr; 58 invokeCallbackAndSetPromise(Dummy); 59 Parent->completeTask(); 60 } 61 62 Callable C; 63 std::shared_ptr<std::promise<ResultTy>> P; 64 TaskQueue *Parent; 65 }; 66 67 public: 68 /// Construct a task queue with no work. TaskQueue(ThreadPool & Scheduler)69 TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } 70 71 /// Blocking destructor: the queue will wait for all work to complete. ~TaskQueue()72 ~TaskQueue() { 73 Scheduler.wait(); 74 assert(Tasks.empty()); 75 } 76 77 /// Asynchronous submission of a task to the queue. The returned future can be 78 /// used to wait for the task (and all previous tasks that have not yet 79 /// completed) to finish. 80 template <typename Callable> async(Callable && C)81 std::future<std::result_of_t<Callable()>> async(Callable &&C) { 82 #if !LLVM_ENABLE_THREADS 83 static_assert(false, 84 "TaskQueue requires building with LLVM_ENABLE_THREADS!"); 85 #endif 86 Task<Callable> T{std::move(C), *this}; 87 using ResultTy = std::result_of_t<Callable()>; 88 std::future<ResultTy> F = T.P->get_future(); 89 { 90 std::lock_guard<std::mutex> Lock(QueueLock); 91 // If there's already a task in flight, just queue this one up. If 92 // there is not a task in flight, bypass the queue and schedule this 93 // task immediately. 94 if (IsTaskInFlight) 95 Tasks.push_back(std::move(T)); 96 else { 97 Scheduler.async(std::move(T)); 98 IsTaskInFlight = true; 99 } 100 } 101 return F; 102 } 103 104 private: completeTask()105 void completeTask() { 106 // We just completed a task. If there are no more tasks in the queue, 107 // update IsTaskInFlight to false and stop doing work. Otherwise 108 // schedule the next task (while not holding the lock). 109 std::function<void()> Continuation; 110 { 111 std::lock_guard<std::mutex> Lock(QueueLock); 112 if (Tasks.empty()) { 113 IsTaskInFlight = false; 114 return; 115 } 116 117 Continuation = std::move(Tasks.front()); 118 Tasks.pop_front(); 119 } 120 Scheduler.async(std::move(Continuation)); 121 } 122 123 /// The thread pool on which to run the work. 124 ThreadPool &Scheduler; 125 126 /// State which indicates whether the queue currently is currently processing 127 /// any work. 128 bool IsTaskInFlight = false; 129 130 /// Mutex for synchronizing access to the Tasks array. 131 std::mutex QueueLock; 132 133 /// Tasks waiting for execution in the queue. 134 std::deque<std::function<void()>> Tasks; 135 }; 136 } // namespace llvm 137 138 #endif // LLVM_SUPPORT_TASKQUEUE_H 139