1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 **     http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_DISPATCHER_H
17 #define BLOOMBERG_QUANTUM_DISPATCHER_H
18 
19 #include <quantum/quantum_context.h>
20 #include <quantum/quantum_configuration.h>
21 #include <quantum/quantum_macros.h>
22 #include <quantum/util/quantum_drain_guard.h>
23 #include <iterator>
24 #include <chrono>
25 
26 namespace Bloomberg {
27 namespace quantum {
28 
29 //==============================================================================================
30 //                                 class Dispatcher
31 //==============================================================================================
32 /// @class Dispatcher.
33 /// @brief Parallel execution engine used to run coroutines or IO tasks asynchronously.
34 ///        This class is the main entry point into the library.
35 /// @warning Please read the following for [exception safety](https://www.boost.org/doc/libs/1_68_0/libs/coroutine2/doc/html/coroutine2/coroutine/asymmetric.html#coroutine2.coroutine.asymmetric.exceptions)
36 class Dispatcher : public ITerminate
37 {
38 public:
39     using ContextTag = ThreadContextTag;
40 
41     /// @brief Constructor.
42     /// @oaram[in] config The configuration for the Quantum dispatcher.
43     explicit Dispatcher(const Configuration& config);
44 
45     /// @brief Destructor.
46     /// @details Destroys the task dispatcher object. This will wait until all coroutines complete, signal
47     ///          all worker threads (coroutine and IO) to exit and join them.
48     ~Dispatcher();
49 
50     /// @brief Post a coroutine to run asynchronously.
51     /// @details This method will post the coroutine on any thread available. Typically it will pick one which has the
52     ///          smallest number of concurrent coroutines executing at the time of the post.
53     /// @tparam RET Type of future returned by this coroutine.
54     /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method,
55     ///              an std::function, a functor generated via std::bind or a lambda. The signature of the callable
56     ///              object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'.
57     /// @tparam ARGS Argument types passed to FUNC.
58     /// @param[in] func Callable object.
59     /// @param[in] args Variable list of arguments passed to the callable object.
60     /// @return A pointer to a thread context object.
61     /// @note This function is non-blocking and returns immediately. The returned thread context cannot be used to chain
62     ///       further coroutines.
63     /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
64     ///          unwind exceptions from propagating which are not derived from std::exception (see link at the top)
65     template <class RET = Deprecated, class FUNC, class ... ARGS>
66     auto post(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(coroResult(func))>;
67 
68     /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation).
69     template <class RET = Deprecated, class FUNC, class ... ARGS>
70     auto post2(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(resultOf2(func))>;
71 
72     /// @brief Post a coroutine to run asynchronously on a specific queue (thread).
73     /// @tparam RET Type of future returned by this coroutine.
74     /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method,
75     ///              an std::function, a functor generated via std::bind or a lambda. The signature of the callable
76     ///              object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'.
77     /// @tparam ARGS Argument types passed to FUNC.
78     /// @param[in] queueId Id of the queue where this coroutine should run. Note that the user can specify IQueue::QueueId::Any
79     ///                    as a value, which is equivalent to running the simpler version of post() above. Valid range is
80     ///                    [0, numCoroutineThreads) or IQueue::QueueId::Any.
81     /// @param[in] isHighPriority If set to true, the coroutine will be scheduled to run immediately after the currently
82     ///                           executing coroutine on 'queueId' has completed or has yielded.
83     /// @param[in] func Callable object.
84     /// @param[in] args Variable list of arguments passed to the callable object.
85     /// @return A pointer to a thread context object.
86     /// @note This function is non-blocking and returns immediately. The returned thread context cannot be used to chain
87     ///       further coroutines.
88     /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
89     ///          unwind exceptions from propagating which are not derived from std::exception (see link at the top).
90     template <class RET = Deprecated, class FUNC, class ... ARGS>
91     auto post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
92         ->ThreadContextPtr<decltype(coroResult(func))>;
93 
94     /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation).
95     template <class RET = Deprecated, class FUNC, class ... ARGS>
96     auto post2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
97         ->ThreadContextPtr<decltype(resultOf2(func))>;
98 
99     /// @brief Post the first coroutine in a continuation chain to run asynchronously.
100     /// @tparam RET Type of future returned by this coroutine.
101     /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method,
102     ///              an std::function, a functor generated via std::bind or a lambda. The signature of the callable
103     ///              object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'.
104     /// @tparam ARGS Argument types passed to FUNC.
105     /// @param[in] func Callable object.
106     /// @param[in] args Variable list of arguments passed to the callable object.
107     /// @return A pointer to a thread context object.
108     /// @note This function is non-blocking and returns immediately. The returned context can be used to chain other
109     ///       coroutines which will run sequentially.
110     /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
111     ///          unwind exceptions from propagating which are not derived from std::exception (see link at the top).
112     template <class RET = Deprecated, class FUNC, class ... ARGS>
113     auto postFirst(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(coroResult(func))>;
114 
115     /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation).
116     template <class RET = Deprecated, class FUNC, class ... ARGS>
117     auto postFirst2(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(resultOf2(func))>;
118 
119     /// @brief Post the first coroutine in a continuation chain to run asynchronously on a specific queue (thread).
120     /// @tparam RET Type of future returned by this coroutine.
121     /// @tparam FUNC Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method,
122     ///              an std::function, a functor generated via std::bind or a lambda. The signature of the callable
123     ///              object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'.
124     /// @tparam ARGS Argument types passed to FUNC.
125     /// @param[in] queueId Id of the queue where this coroutine should run. Note that the user can specify IQueue::QueueId::Any
126     ///                    as a value, which is equivalent to running the simpler version of post() above. Valid range is
127     ///                    [0, numCoroutineThreads) or IQueue::QueueId::Any.
128     /// @param[in] isHighPriority If set to true, the coroutine will be scheduled to run immediately after the currently
129     ///                           executing coroutine on 'queueId' has completed or has yielded.
130     /// @param[in] func Callable object.
131     /// @param[in] args Variable list of arguments passed to the callable object.
132     /// @return A pointer to a thread context object.
133     /// @note This function is non-blocking and returns immediately. The returned context can be used to chain other
134     ///       coroutines which will run sequentially.
135     /// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
136     ///          unwind exceptions from propagating which are not derived from std::exception (see link at the top).
137     template <class RET = Deprecated, class FUNC, class ... ARGS>
138     auto postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
139         ->ThreadContextPtr<decltype(coroResult(func))>;
140 
141     /// @brief Version 2 of the API which supports a simpler coroutine signature (see documentation).
142     template <class RET = Deprecated, class FUNC, class ... ARGS>
143     auto postFirst2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
144         ->ThreadContextPtr<decltype(resultOf2(func))>;
145 
146     /// @brief Post a blocking IO (or long running) task to run asynchronously on the IO thread pool.
147     /// @tparam RET Type of future returned by this task.
148     /// @tparam FUNC Callable object type. Can be a standalone function, a method, an std::function,
149     ///              a functor generated via std::bind or a lambda. The signature of the callable
150     ///              object must strictly be 'int f(ThreadPromise<RET>::Ptr, ...)'.
151     /// @tparam ARGS Argument types passed to FUNC.
152     /// @param[in] func Callable object.
153     /// @param[in] args Variable list of arguments passed to the callable object.
154     /// @return A pointer to a thread future object.
155     /// @note This function is non-blocking and returns immediately. The passed function will not be wrapped in a coroutine.
156     template <class RET = Deprecated, class FUNC, class ... ARGS>
157     auto postAsyncIo(FUNC&& func, ARGS&&... args)->ThreadFuturePtr<decltype(ioResult(func))>;
158 
159     /// @brief Version 2 of the API which supports a simpler IO task signature (see documentation).
160     template <class RET = Deprecated, class FUNC, class ... ARGS>
161     auto postAsyncIo2(FUNC&& func, ARGS&&... args)->ThreadFuturePtr<decltype(resultOf2(func))>;
162 
163     /// @brief Post a blocking IO (or long running) task to run asynchronously on a specific thread in the IO thread pool.
164     /// @tparam RET Type of future returned by this task.
165     /// @tparam FUNC Callable object type. Can be a standalone function, a method, an std::function,
166     ///              a functor generated via std::bind or a lambda. The signature of the callable
167     ///              object must strictly be 'int f(ThreadPromise<RET>::Ptr, ...)'.
168     /// @tparam ARGS Argument types passed to FUNC.
169     /// @param[in] queueId Id of the queue where this task should run. Note that the user can specify IQueue::QueueId::Any
170     ///                    as a value, which is equivalent to running the simpler version of postAsyncIo() above. Valid range is
171     ///                    [0, numCoroutineThreads) or IQueue::QueueId::Any.
172     /// @param[in] isHighPriority If set to true, the task will be scheduled to run immediately.
173     /// @param[in] func Callable object.
174     /// @param[in] args Variable list of arguments passed to the callable object.
175     /// @return A pointer to a thread future object.
176     /// @note This function is non-blocking and returns immediately. The passed function will not be wrapped in a coroutine.
177     template <class RET = Deprecated, class FUNC, class ... ARGS>
178     auto postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
179         ->ThreadFuturePtr<decltype(ioResult(func))>;
180 
181     /// @brief Version 2 of the API which supports a simpler IO task signature (see documentation).
182     template <class RET = Deprecated, class FUNC, class ... ARGS>
183     auto postAsyncIo2(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
184         ->ThreadFuturePtr<decltype(resultOf2(func))>;
185 
186     /// @brief Applies the given unary function to all the elements in the range [first,last).
187     ///        This function runs in parallel.
188     /// @tparam RET The return value of the unary function.
189     /// @tparam INPUT_IT The type of iterator.
190     /// @tparam FUNC A unary function of type 'RET(VoidContextPtr, *INPUT_IT)'.
191     /// @oaram[in] first The first element in the range.
192     /// @oaram[in] last The last element in the range (exclusive).
193     /// @oaram[in] func The unary function.
194     /// @return A vector of future values corresponding to the output of 'func' on every element in the range.
195     /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator
196     /// @note Each func invocation will run inside its own coroutine instance.
197     ///       Prefer this function over forEachBatch() if performing IO inside FUNC.
198     /// @warning The VoidContextPtr can be used to yield() or to post additional coroutines or IO tasks.
199     ///          However it should *not* be set and this will result in undefined behavior.
200     template <class RET = Deprecated,
201               class INPUT_IT,
202               class FUNC,
203               class = Traits::IsInputIterator<INPUT_IT>>
204     auto forEach(INPUT_IT first, INPUT_IT last, FUNC&& func)
205         ->ThreadContextPtr<std::vector<decltype(coroResult(func))>>;
206 
207     /// @brief Same as forEach() but takes a length as second argument in case INPUT_IT
208     ///        is not a random access iterator.
209     template <class RET = Deprecated,
210               class INPUT_IT,
211               class FUNC>
212     auto forEach(INPUT_IT first, size_t num, FUNC&& func)
213         ->ThreadContextPtr<std::vector<decltype(coroResult(func))>>;
214 
215     /// @brief The batched version of forEach(). This function applies the given unary function
216     ///        to all the elements in the range [first,last). This function runs serially with respect
217     ///        to other functions in the same batch.
218     /// @return A vector of value vectors (i.e. one per batch).
219     /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator.
220     /// @note The input range is split equally among coroutines and executed in batches. This function
221     ///       achieves higher throughput rates than the non-batched mode, if FUNC is CPU-bound.
222     template <class RET = Deprecated,
223               class INPUT_IT,
224               class FUNC,
225               class = Traits::IsInputIterator<INPUT_IT>>
226     auto forEachBatch(INPUT_IT first, INPUT_IT last, FUNC&& func)
227         ->ThreadContextPtr<std::vector<std::vector<decltype(coroResult(func))>>>;
228 
229     /// @brief Same as forEachBatch() but takes a length as second argument in case INPUT_IT
230     ///        is not a random access iterator.
231     template <class RET = Deprecated,
232               class INPUT_IT,
233               class FUNC>
234     auto forEachBatch(INPUT_IT first, size_t num, FUNC&& func)
235         ->ThreadContextPtr<std::vector<std::vector<decltype(coroResult(func))>>>;
236 
237     /// @brief Implementation of map-reduce functionality.
238     /// @tparam KEY The KEY type used for mapping and reducing.
239     /// @tparam MAPPED_TYPE The output type after a map operation.
240     /// @tparam REDUCED_TYPE The output type after a reduce operation.
241     /// @tparam MAPPER_FUNC The mapper function having the signature
242     ///         'std::vector<std::pair<KEY,MAPPED_TYPE>>(VoidContextPtr, *INPUT_IT)'
243     /// @tparam REDUCER_FUNC The reducer function having the signature
244     ///         'std::pair<KEY,REDUCED_TYPE>(VoidContextPtr, std::pair<KEY, std::vector<MAPPED_TYPE>>&&)'
245     /// @tparam INPUT_IT The iterator type.
246     /// @oaram[in] first The start iterator to a list of items to be processed in the range [first,last).
247     /// @oaram[in] last The end iterator to a list of items (not inclusive).
248     /// @oaram[in] mapper The mapper function.
249     /// @oaram[in] reducer The reducer function.
250     /// @return A future to a reduced map of values.
251     /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator.
252     /// @warning The VoidContextPtr can be used to yield() or to post additional coroutines or IO tasks.
253     ///          However it should *not* be set and this will result in undefined behavior.
254     template <class KEY = Deprecated,
255               class MAPPED_TYPE = Deprecated,
256               class REDUCED_TYPE = Deprecated,
257               class MAPPER_FUNC,
258               class REDUCER_FUNC,
259               class INPUT_IT,
260               class = Traits::IsInputIterator<INPUT_IT>>
261     auto mapReduce(INPUT_IT first,
262                    INPUT_IT last,
263                    MAPPER_FUNC mapper,
264                    REDUCER_FUNC reducer)->
265           ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>;
266 
267     /// @brief Same as mapReduce() but takes a length as second argument in case INPUT_IT
268     ///        is not a random access iterator.
269     template <class KEY = Deprecated,
270               class MAPPED_TYPE = Deprecated,
271               class REDUCED_TYPE = Deprecated,
272               class MAPPER_FUNC,
273               class REDUCER_FUNC,
274               class INPUT_IT,
275               class = Traits::IsInputIterator<INPUT_IT>>
276     auto mapReduce(INPUT_IT first,
277                    size_t num,
278                    MAPPER_FUNC mapper,
279                    REDUCER_FUNC reducer)->
280           ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>;
281 
282     /// @brief This version of mapReduce() runs both the mapper and the reducer functions in batches
283     ///        for improved performance. This should be used in the case where the functions are
284     ///        more CPU intensive with little or no IO.
285     /// @note Use this function if InputIt meets the requirement of a RandomAccessIterator.
286     template <class KEY = Deprecated,
287               class MAPPED_TYPE = Deprecated,
288               class REDUCED_TYPE = Deprecated,
289               class MAPPER_FUNC,
290               class REDUCER_FUNC,
291               class INPUT_IT,
292               class = Traits::IsInputIterator<INPUT_IT>>
293     auto mapReduceBatch(INPUT_IT first,
294                         INPUT_IT last,
295                         MAPPER_FUNC mapper,
296                         REDUCER_FUNC reducer)->
297           ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>;
298 
299     /// @brief Same as mapReduceBatch() but takes a length as second argument in case INPUT_IT
300     ///        is not a random access iterator.
301     template <class KEY = Deprecated,
302               class MAPPED_TYPE = Deprecated,
303               class REDUCED_TYPE = Deprecated,
304               class MAPPER_FUNC,
305               class REDUCER_FUNC,
306               class INPUT_IT,
307               class = Traits::IsInputIterator<INPUT_IT>>
308     auto mapReduceBatch(INPUT_IT first,
309                         size_t num,
310                         MAPPER_FUNC mapper,
311                         REDUCER_FUNC reducer)->
312           ThreadContextPtr<std::map<decltype(mappedKeyOf(mapper)), decltype(reducedTypeOf(reducer))>>;
313 
314     /// @brief Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will not complete.
315     ///        Call this function for a fast shutdown of the dispatcher.
316     /// @note This function blocks.
317     void terminate() final;
318 
319     /// @brief Returns the total number of queued tasks for the specified type and queue id.
320     /// @param[in] type The type of queue.
321     /// @param[in] queueId The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro,
322     ///                    [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either.
323     /// @return The total number of queued tasks including the currently executing one.
324     /// @note IQueue::QueueId::Same is an invalid queue id. IQueue::QueueId::Any is only valid for IO queue type. When
325     ///       type IQueue::QueueType::All is specified, the queueId is not used and must be left at default value.
326     size_t size(IQueue::QueueType type = IQueue::QueueType::All,
327                 int queueId = (int)IQueue::QueueId::All) const;
328 
329     /// @brief Check if the specified type and queue id is empty (i.e. there are no running tasks)
330     /// @param[in] type The type of queue.
331     /// @param[in] queueId The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro,
332     ///                    [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either.
333     /// @return True if empty, false otherwise.
334     /// @note IQueue::QueueId::Same is an invalid queue id. IQueue::QueueId::Any is only valid for IO queue type. When
335     ///       type IQueue::QueueType::All is specified, the queueId is not used and must be left at default value.
336     bool empty(IQueue::QueueType type = IQueue::QueueType::All,
337                int queueId = (int)IQueue::QueueId::All) const;
338 
339     /// @brief Drains all queues on this dispatcher object.
340     /// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all queues drain.
341     /// @param[in] isFinal If set to true, the dispatcher will not allow any more processing after the drain completes.
342     /// @return True if everything drains before timeout, false otherwise.
343     /// @note This function blocks until all coroutines and IO tasks have completed. During this time, posting
344     ///       of new tasks is disabled unless they are posted from within an already executing coroutine.
345     bool drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
346                bool isFinal = false);
347 
348     /// @brief Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed
349     ///        than this number essentially indicates the number of cores.
350     /// @return The number of threads.
351     /// @note Each thread services its own queueId, therefore this number can be used when assigning coroutines
352     ///       to a specific queue.
353     int getNumCoroutineThreads() const;
354 
355     /// @brief Returns the number of underlying IO threads as specified in the constructor.
356     /// @return The number of threads.
357     /// @note Each thread services its own queueId, therefore this number can be used when assigning IO tasks
358     ///       to a specific queue.
359     int getNumIoThreads() const;
360 
361     /// @brief Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any
362     /// when using Dispatcher::post
363     /// @return queueIdRange The range of queueIds that IQueue::QueueId::Any covers
364     const std::pair<int, int>& getCoroQueueIdRangeForAny() const;
365 
366     /// @brief Returns a statistics object for the specified type and queue id.
367     /// @param[in] type The type of queue.
368     /// @param[in] queueId The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro,
369     ///                    [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either.
370     /// @return Aggregated or individual queue stats.
371     /// @note IQueue::QueueId::Same is an invalid queue id. IQueue::QueueId::Any is only valid for IO queue type. When
372     ///       type IQueue::QueueType::All is specified, the queueId is not used and must be left at default value.
373     QueueStatistics stats(IQueue::QueueType type = IQueue::QueueType::All,
374                           int queueId = (int)IQueue::QueueId::All);
375 
376     /// @brief Resets all coroutine and IO queue counters.
377     void resetStats();
378 
379 private:
380     template <class RET, class FUNC, class ... ARGS>
381     ThreadContextPtr<RET>
382     postImpl(int queueId, bool isHighPriority, ITask::Type type, FUNC&& func, ARGS&&... args);
383 
384     template <class RET, class FUNC, class ... ARGS>
385     ThreadFuturePtr<RET>
386     postAsyncIoImpl(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
387 
388     //Members
389     DispatcherCore              _dispatcher;
390     std::atomic_bool            _drain;
391     std::atomic_bool            _terminated;
392 };
393 
394 using TaskDispatcher = Dispatcher; //alias
395 
396 }}
397 
398 #include <quantum/impl/quantum_dispatcher_impl.h>
399 
400 #endif //BLOOMBERG_QUANTUM_DISPATCHER_H
401