1 /* 2 ** Copyright 2018 Bloomberg Finance L.P. 3 ** 4 ** Licensed under the Apache License, Version 2.0 (the "License"); 5 ** you may not use this file except in compliance with the License. 6 ** You may obtain a copy of the License at 7 ** 8 ** http://www.apache.org/licenses/LICENSE-2.0 9 ** 10 ** Unless required by applicable law or agreed to in writing, software 11 ** distributed under the License is distributed on an "AS IS" BASIS, 12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 ** See the License for the specific language governing permissions and 14 ** limitations under the License. 15 */ 16 #ifndef BLOOMBERG_QUANTUM_DISPATCHER_H 17 #define BLOOMBERG_QUANTUM_DISPATCHER_H 18 19 #include <quantum/quantum_context.h> 20 #include <quantum/quantum_configuration.h> 21 #include <quantum/quantum_macros.h> 22 #include <quantum/util/quantum_drain_guard.h> 23 #include <iterator> 24 #include <chrono> 25 26 namespace Bloomberg { 27 namespace quantum { 28 29 //============================================================================================== 30 // class Dispatcher 31 //============================================================================================== 32 /// @class Dispatcher. 33 /// @brief Parallel execution engine used to run coroutines or IO tasks asynchronously. 34 /// This class is the main entry point into the library. 35 /// @warning Please read the following for [exception safety](https://www.boost.org/doc/libs/1_68_0/libs/coroutine2/doc/html/coroutine2/coroutine/asymmetric.html#coroutine2.coroutine.asymmetric.exceptions) 36 class Dispatcher : public ITerminate 37 { 38 public: 39 using ContextTag = ThreadContextTag; 40 41 /// @brief Constructor. 42 /// @oaram[in] config The configuration for the Quantum dispatcher. 43 explicit Dispatcher(const Configuration& config); 44 45 /// @brief Destructor. 46 /// @details Destroys the task dispatcher object. This will wait until all coroutines complete, signal 47 /// all worker threads (coroutine and IO) to exit and join them. 48 ~Dispatcher(); 49 50 /// @brief Post a coroutine to run asynchronously. 51 /// @details This method will post the coroutine on any thread available. Typically it will pick one which has the 52 /// smallest number of concurrent coroutines executing at the time of the post. 53 /// @tparam RET Type of future returned by this coroutine. 54 /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, 55 /// an std::function, a functor generated via std::bind or a lambda. The signature of the callable 56 /// object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. 57 /// @tparam ARGS Argument types passed to FUNC. 58 /// @param[in] func Callable object. 59 /// @param[in] args Variable list of arguments passed to the callable object. 60 /// @return A pointer to a thread context object. 61 /// @note This function is non-blocking and returns immediately. The returned thread context cannot be used to chain 62 /// further coroutines. 63 /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack 64 /// unwind exceptions from propagating which are not derived from std::exception (see link at the top) 65 template <class RET = Deprecated, class FUNC, class ... ARGS> 66 auto post(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(coroResult(func))>; 67 68 /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation). 69 template <class RET = Deprecated, class FUNC, class ... ARGS> 70 auto post2(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(resultOf2(func))>; 71 72 /// @brief Post a coroutine to run asynchronously on a specific queue (thread). 73 /// @tparam RET Type of future returned by this coroutine. 74 /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, 75 /// an std::function, a functor generated via std::bind or a lambda. The signature of the callable 76 /// object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. 77 /// @tparam ARGS Argument types passed to FUNC. 78 /// @param[in] queueId Id of the queue where this coroutine should run. Note that the user can specify IQueue::QueueId::Any 79 /// as a value, which is equivalent to running the simpler version of post() above. Valid range is 80 /// [0, numCoroutineThreads) or IQueue::QueueId::Any. 81 /// @param[in] isHighPriority If set to true, the coroutine will be scheduled to run immediately after the currently 82 /// executing coroutine on 'queueId' has completed or has yielded. 83 /// @param[in] func Callable object. 84 /// @param[in] args Variable list of arguments passed to the callable object. 85 /// @return A pointer to a thread context object. 86 /// @note This function is non-blocking and returns immediately. The returned thread context cannot be used to chain 87 /// further coroutines. 88 /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack 89 /// unwind exceptions from propagating which are not derived from std::exception (see link at the top). 90 template <class RET = Deprecated, class FUNC, class ... ARGS> 91 auto post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args) 92 ->ThreadContextPtr<decltype(coroResult(func))>; 93 94 /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation). 95 template <class RET = Deprecated, class FUNC, class ... ARGS> 96 auto post2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args) 97 ->ThreadContextPtr<decltype(resultOf2(func))>; 98 99 /// @brief Post the first coroutine in a continuation chain to run asynchronously. 100 /// @tparam RET Type of future returned by this coroutine. 101 /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, 102 /// an std::function, a functor generated via std::bind or a lambda. The signature of the callable 103 /// object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. 104 /// @tparam ARGS Argument types passed to FUNC. 105 /// @param[in] func Callable object. 106 /// @param[in] args Variable list of arguments passed to the callable object. 107 /// @return A pointer to a thread context object. 108 /// @note This function is non-blocking and returns immediately. The returned context can be used to chain other 109 /// coroutines which will run sequentially. 110 /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack 111 /// unwind exceptions from propagating which are not derived from std::exception (see link at the top). 112 template <class RET = Deprecated, class FUNC, class ... ARGS> 113 auto postFirst(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(coroResult(func))>; 114 115 /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation). 116 template <class RET = Deprecated, class FUNC, class ... ARGS> 117 auto postFirst2(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(resultOf2(func))>; 118 119 /// @brief Post the first coroutine in a continuation chain to run asynchronously on a specific queue (thread). 120 /// @tparam RET Type of future returned by this coroutine. 121 /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, 122 /// an std::function, a functor generated via std::bind or a lambda. The signature of the callable 123 /// object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. 124 /// @tparam ARGS Argument types passed to FUNC. 125 /// @param[in] queueId Id of the queue where this coroutine should run. Note that the user can specify IQueue::QueueId::Any 126 /// as a value, which is equivalent to running the simpler version of post() above. Valid range is 127 /// [0, numCoroutineThreads) or IQueue::QueueId::Any. 128 /// @param[in] isHighPriority If set to true, the coroutine will be scheduled to run immediately after the currently 129 /// executing coroutine on 'queueId' has completed or has yielded. 130 /// @param[in] func Callable object. 131 /// @param[in] args Variable list of arguments passed to the callable object. 132 /// @return A pointer to a thread context object. 133 /// @note This function is non-blocking and returns immediately. The returned context can be used to chain other 134 /// coroutines which will run sequentially. 135 /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack 136 /// unwind exceptions from propagating which are not derived from std::exception (see link at the top). 137 template <class RET = Deprecated, class FUNC, class ... ARGS> 138 auto postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args) 139 ->ThreadContextPtr<decltype(coroResult(func))>; 140 141 /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation). 142 template <class RET = Deprecated, class FUNC, class ... ARGS> 143 auto postFirst2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args) 144 ->ThreadContextPtr<decltype(resultOf2(func))>; 145 146 /// @brief Post a blocking IO (or long running) task to run asynchronously on the IO thread pool. 147 /// @tparam RET Type of future returned by this task. 148 /// @tparam FUNC Callable object type. Can be a standalone function, a method, an std::function, 149 /// a functor generated via std::bind or a lambda. The signature of the callable 150 /// object must strictly be 'int f(ThreadPromise<RET>::Ptr, ...)'. 151 /// @tparam ARGS Argument types passed to FUNC. 152 /// @param[in] func Callable object. 153 /// @param[in] args Variable list of arguments passed to the callable object. 154 /// @return A pointer to a thread future object. 155 /// @note This function is non-blocking and returns immediately. The passed function will not be wrapped in a coroutine. 156 template <class RET = Deprecated, class FUNC, class ... ARGS> 157 auto postAsyncIo(FUNC&& func, ARGS&&... args)->ThreadFuturePtr<decltype(ioResult(func))>; 158 159 /// @brief Version 2 of the API which supports a simpler IO task signature (see documentation). 160 template <class RET = Deprecated, class FUNC, class ... ARGS> 161 auto postAsyncIo2(FUNC&& func, ARGS&&... args)->ThreadFuturePtr<decltype(resultOf2(func))>; 162 163 /// @brief Post a blocking IO (or long running) task to run asynchronously on a specific thread in the IO thread pool. 164 /// @tparam RET Type of future returned by this task. 165 /// @tparam FUNC Callable object type. Can be a standalone function, a method, an std::function, 166 /// a functor generated via std::bind or a lambda. The signature of the callable 167 /// object must strictly be 'int f(ThreadPromise<RET>::Ptr, ...)'. 168 /// @tparam ARGS Argument types passed to FUNC. 169 /// @param[in] queueId Id of the queue where this task should run. Note that the user can specify IQueue::QueueId::Any 170 /// as a value, which is equivalent to running the simpler version of postAsyncIo() above. Valid range is 171 /// [0, numCoroutineThreads) or IQueue::QueueId::Any. 172 /// @param[in] isHighPriority If set to true, the task will be scheduled to run immediately. 173 /// @param[in] func Callable object. 174 /// @param[in] args Variable list of arguments passed to the callable object. 175 /// @return A pointer to a thread future object. 176 /// @note This function is non-blocking and returns immediately. The passed function will not be wrapped in a coroutine. 177 template <class RET = Deprecated, class FUNC, class ... ARGS> 178 auto postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args) 179 ->ThreadFuturePtr<decltype(ioResult(func))>; 180 181 /// @brief Version 2 of the API which supports a simpler IO task signature (see documentation). 182 template <class RET = Deprecated, class FUNC, class ... ARGS> 183 auto postAsyncIo2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args) 184 ->ThreadFuturePtr<decltype(resultOf2(func))>; 185 186 /// @brief Applies the given unary function to all the elements in the range [first,last). 187 /// This function runs in parallel. 188 /// @tparam RET The return value of the unary function. 189 /// @tparam INPUT_IT The type of iterator. 190 /// @tparam FUNC A unary function of type 'RET(VoidContextPtr, *INPUT_IT)'. 191 /// @oaram[in] first The first element in the range. 192 /// @oaram[in] last The last element in the range (exclusive). 193 /// @oaram[in] func The unary function. 194 /// @return A vector of future values corresponding to the output of 'func' on every element in the range. 195 /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator 196 /// @note Each func invocation will run inside its own coroutine instance. 197 /// Prefer this function over forEachBatch() if performing IO inside FUNC. 198 /// @warning The VoidContextPtr can be used to yield() or to post additional coroutines or IO tasks. 199 /// However it should *not* be set and this will result in undefined behavior. 200 template <class RET = Deprecated, 201 class INPUT_IT, 202 class FUNC, 203 class = Traits::IsInputIterator<INPUT_IT>> 204 auto forEach(INPUT_IT first, INPUT_IT last, FUNC&& func) 205 ->ThreadContextPtr<std::vector<decltype(coroResult(func))>>; 206 207 /// @brief Same as forEach() but takes a length as second argument in case INPUT_IT 208 /// is not a random access iterator. 209 template <class RET = Deprecated, 210 class INPUT_IT, 211 class FUNC> 212 auto forEach(INPUT_IT first, size_t num, FUNC&& func) 213 ->ThreadContextPtr<std::vector<decltype(coroResult(func))>>; 214 215 /// @brief The batched version of forEach(). This function applies the given unary function 216 /// to all the elements in the range [first,last). This function runs serially with respect 217 /// to other functions in the same batch. 218 /// @return A vector of value vectors (i.e. one per batch). 219 /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator. 220 /// @note The input range is split equally among coroutines and executed in batches. This function 221 /// achieves higher throughput rates than the non-batched mode, if FUNC is CPU-bound. 222 template <class RET = Deprecated, 223 class INPUT_IT, 224 class FUNC, 225 class = Traits::IsInputIterator<INPUT_IT>> 226 auto forEachBatch(INPUT_IT first, INPUT_IT last, FUNC&& func) 227 ->ThreadContextPtr<std::vector<std::vector<decltype(coroResult(func))>>>; 228 229 /// @brief Same as forEachBatch() but takes a length as second argument in case INPUT_IT 230 /// is not a random access iterator. 231 template <class RET = Deprecated, 232 class INPUT_IT, 233 class FUNC> 234 auto forEachBatch(INPUT_IT first, size_t num, FUNC&& func) 235 ->ThreadContextPtr<std::vector<std::vector<decltype(coroResult(func))>>>; 236 237 /// @brief Implementation of map-reduce functionality. 238 /// @tparam KEY The KEY type used for mapping and reducing. 239 /// @tparam MAPPED_TYPE The output type after a map operation. 240 /// @tparam REDUCED_TYPE The output type after a reduce operation. 241 /// @tparam MAPPER_FUNC The mapper function having the signature 242 /// 'std::vector<std::pair<KEY,MAPPED_TYPE>>(VoidContextPtr, *INPUT_IT)' 243 /// @tparam REDUCER_FUNC The reducer function having the signature 244 /// 'std::pair<KEY,REDUCED_TYPE>(VoidContextPtr, std::pair<KEY, std::vector<MAPPED_TYPE>>&&)' 245 /// @tparam INPUT_IT The iterator type. 246 /// @oaram[in] first The start iterator to a list of items to be processed in the range [first,last). 247 /// @oaram[in] last The end iterator to a list of items (not inclusive). 248 /// @oaram[in] mapper The mapper function. 249 /// @oaram[in] reducer The reducer function. 250 /// @return A future to a reduced map of values. 251 /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator. 252 /// @warning The VoidContextPtr can be used to yield() or to post additional coroutines or IO tasks. 253 /// However it should *not* be set and this will result in undefined behavior. 254 template <class KEY = Deprecated, 255 class MAPPED_TYPE = Deprecated, 256 class REDUCED_TYPE = Deprecated, 257 class MAPPER_FUNC, 258 class REDUCER_FUNC, 259 class INPUT_IT, 260 class = Traits::IsInputIterator<INPUT_IT>> 261 auto mapReduce(INPUT_IT first, 262 INPUT_IT last, 263 MAPPER_FUNC mapper, 264 REDUCER_FUNC reducer)-> 265 ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>; 266 267 /// @brief Same as mapReduce() but takes a length as second argument in case INPUT_IT 268 /// is not a random access iterator. 269 template <class KEY = Deprecated, 270 class MAPPED_TYPE = Deprecated, 271 class REDUCED_TYPE = Deprecated, 272 class MAPPER_FUNC, 273 class REDUCER_FUNC, 274 class INPUT_IT, 275 class = Traits::IsInputIterator<INPUT_IT>> 276 auto mapReduce(INPUT_IT first, 277 size_t num, 278 MAPPER_FUNC mapper, 279 REDUCER_FUNC reducer)-> 280 ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>; 281 282 /// @brief This version of mapReduce() runs both the mapper and the reducer functions in batches 283 /// for improved performance. This should be used in the case where the functions are 284 /// more CPU intensive with little or no IO. 285 /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator. 286 template <class KEY = Deprecated, 287 class MAPPED_TYPE = Deprecated, 288 class REDUCED_TYPE = Deprecated, 289 class MAPPER_FUNC, 290 class REDUCER_FUNC, 291 class INPUT_IT, 292 class = Traits::IsInputIterator<INPUT_IT>> 293 auto mapReduceBatch(INPUT_IT first, 294 INPUT_IT last, 295 MAPPER_FUNC mapper, 296 REDUCER_FUNC reducer)-> 297 ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>; 298 299 /// @brief Same as mapReduceBatch() but takes a length as second argument in case INPUT_IT 300 /// is not a random access iterator. 301 template <class KEY = Deprecated, 302 class MAPPED_TYPE = Deprecated, 303 class REDUCED_TYPE = Deprecated, 304 class MAPPER_FUNC, 305 class REDUCER_FUNC, 306 class INPUT_IT, 307 class = Traits::IsInputIterator<INPUT_IT>> 308 auto mapReduceBatch(INPUT_IT first, 309 size_t num, 310 MAPPER_FUNC mapper, 311 REDUCER_FUNC reducer)-> 312 ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>; 313 314 /// @brief Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will not complete. 315 /// Call this function for a fast shutdown of the dispatcher. 316 /// @note This function blocks. 317 void terminate() final; 318 319 /// @brief Returns the total number of queued tasks for the specified type and queue id. 320 /// @param[in] type The type of queue. 321 /// @param[in] queueId The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro, 322 /// [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either. 323 /// @return The total number of queued tasks including the currently executing one. 324 /// @note IQueue::QueueId::Same is an invalid queue id. IQueue::QueueId::Any is only valid for IO queue type. When 325 /// type IQueue::QueueType::All is specified, the queueId is not used and must be left at default value. 326 size_t size(IQueue::QueueType type = IQueue::QueueType::All, 327 int queueId = (int)IQueue::QueueId::All) const; 328 329 /// @brief Check if the specified type and queue id is empty (i.e. there are no running tasks) 330 /// @param[in] type The type of queue. 331 /// @param[in] queueId The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro, 332 /// [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either. 333 /// @return True if empty, false otherwise. 334 /// @note IQueue::QueueId::Same is an invalid queue id. IQueue::QueueId::Any is only valid for IO queue type. When 335 /// type IQueue::QueueType::All is specified, the queueId is not used and must be left at default value. 336 bool empty(IQueue::QueueType type = IQueue::QueueType::All, 337 int queueId = (int)IQueue::QueueId::All) const; 338 339 /// @brief Drains all queues on this dispatcher object. 340 /// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all queues drain. 341 /// @param[in] isFinal If set to true, the dispatcher will not allow any more processing after the drain completes. 342 /// @return True if everything drains before timeout, false otherwise. 343 /// @note This function blocks until all coroutines and IO tasks have completed. During this time, posting 344 /// of new tasks is disabled unless they are posted from within an already executing coroutine. 345 bool drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1), 346 bool isFinal = false); 347 348 /// @brief Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed 349 /// than this number essentially indicates the number of cores. 350 /// @return The number of threads. 351 /// @note Each thread services its own queueId, therefore this number can be used when assigning coroutines 352 /// to a specific queue. 353 int getNumCoroutineThreads() const; 354 355 /// @brief Returns the number of underlying IO threads as specified in the constructor. 356 /// @return The number of threads. 357 /// @note Each thread services its own queueId, therefore this number can be used when assigning IO tasks 358 /// to a specific queue. 359 int getNumIoThreads() const; 360 361 /// @brief Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any 362 /// when using Dispatcher::post 363 /// @return queueIdRange The range of queueIds that IQueue::QueueId::Any covers 364 const std::pair<int, int>& getCoroQueueIdRangeForAny() const; 365 366 /// @brief Returns a statistics object for the specified type and queue id. 367 /// @param[in] type The type of queue. 368 /// @param[in] queueId The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro, 369 /// [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either. 370 /// @return Aggregated or individual queue stats. 371 /// @note IQueue::QueueId::Same is an invalid queue id. IQueue::QueueId::Any is only valid for IO queue type. When 372 /// type IQueue::QueueType::All is specified, the queueId is not used and must be left at default value. 373 QueueStatistics stats(IQueue::QueueType type = IQueue::QueueType::All, 374 int queueId = (int)IQueue::QueueId::All); 375 376 /// @brief Resets all coroutine and IO queue counters. 377 void resetStats(); 378 379 private: 380 template <class RET, class FUNC, class ... ARGS> 381 ThreadContextPtr<RET> 382 postImpl(int queueId, bool isHighPriority, ITask::Type type, FUNC&& func, ARGS&&... args); 383 384 template <class RET, class FUNC, class ... ARGS> 385 ThreadFuturePtr<RET> 386 postAsyncIoImpl(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args); 387 388 //Members 389 DispatcherCore _dispatcher; 390 std::atomic_bool _drain; 391 std::atomic_bool _terminated; 392 }; 393 394 using TaskDispatcher = Dispatcher; //alias 395 396 }} 397 398 #include <quantum/impl/quantum_dispatcher_impl.h> 399 400 #endif //BLOOMBERG_QUANTUM_DISPATCHER_H 401