1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 2.2.2
4 /// @author Christian Blume, Guan Wang
5 /// @date 2019
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #ifndef TRANSWARP_CPP11
10 #include <any>
11 #endif
12 #include <array>
13 #include <atomic>
14 #include <chrono>
15 #include <cstddef>
16 #include <cstdint>
17 #include <functional>
18 #include <future>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #ifndef TRANSWARP_CPP11
23 #include <optional>
24 #endif
25 #include <queue>
26 #include <stdexcept>
27 #include <string>
28 #include <thread>
29 #include <tuple>
30 #include <type_traits>
31 #include <unordered_map>
32 #include <utility>
33 #include <vector>
34 
35 
36 #ifdef TRANSWARP_MINIMUM_TASK_SIZE
37 
38 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
39 #define TRANSWARP_DISABLE_TASK_CUSTOM_DATA
40 #endif
41 
42 #ifndef TRANSWARP_DISABLE_TASK_NAME
43 #define TRANSWARP_DISABLE_TASK_NAME
44 #endif
45 
46 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
47 #define TRANSWARP_DISABLE_TASK_PRIORITY
48 #endif
49 
50 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
51 #define TRANSWARP_DISABLE_TASK_REFCOUNT
52 #endif
53 
54 #ifndef TRANSWARP_DISABLE_TASK_TIME
55 #define TRANSWARP_DISABLE_TASK_TIME
56 #endif
57 
58 #endif
59 
60 
61 /// The transwarp namespace
62 namespace transwarp {
63 
64 
65 #ifdef TRANSWARP_CPP11
66 /// A simple value class that optionally holds a string
67 class option_str {
68 public:
option_str()69     option_str() {}
70 
option_str(std::string str)71     option_str(std::string str)
72     : str_(std::move(str)),
73       valid_(true)
74     {}
75 
76     // default copy/move semantics
77     option_str(const option_str&) = default;
78     option_str& operator=(const option_str&) = default;
79     option_str(option_str&&) = default;
80     option_str& operator=(option_str&&) = default;
81 
82     operator bool() const noexcept {
83         return valid_;
84     }
85 
86     const std::string& operator*() const noexcept {
87         return str_;
88     }
89 
90 private:
91     std::string str_;
92     bool valid_ = false;
93 };
94 
95 /// Detail namespace for internal functionality only
96 namespace detail {
97 
98 /// Used to handle data storage for a type-erased object
99 class storage {
100 public:
101     virtual ~storage() = default;
102     virtual std::unique_ptr<storage> clone() const = 0;
103     virtual void destroy(void* data) const noexcept = 0;
104     virtual void copy(const void* src, void*& dest) const = 0;
105     virtual void move(void*& src, void*& dest) const noexcept = 0;
106 };
107 
108 /// Used to handle data storage for a type-erased object
109 template<typename T>
110 class storage_impl : public transwarp::detail::storage {
111 public:
clone()112     std::unique_ptr<transwarp::detail::storage> clone() const override {
113         return std::unique_ptr<transwarp::detail::storage>(new storage_impl);
114     }
destroy(void * data)115     void destroy(void* data) const noexcept override {
116         delete reinterpret_cast<T*>(data);
117     }
copy(const void * src,void * & dest)118     void copy(const void* src, void*& dest) const override {
119         dest = new T(*reinterpret_cast<const T*>(src));
120     }
move(void * & src,void * & dest)121     void move(void*& src, void*& dest) const noexcept override {
122         dest = src;
123         src = nullptr;
124     }
125 };
126 
127 } // detail
128 
129 /// A simple value class that can hold any value
130 class any_data {
131 public:
any_data()132     any_data()
133     : data_(nullptr)
134     {}
135 
136     template<typename T>
any_data(T && value)137     any_data(T&& value)
138     : storage_(new transwarp::detail::storage_impl<typename std::decay<T>::type>),
139       data_(new typename std::decay<T>::type(std::forward<T>(value)))
140     {}
141 
any_data(const any_data & other)142     any_data(const any_data& other)
143     : storage_(other.storage_ ? other.storage_->clone() : nullptr)
144     {
145         if (other.data_) {
146             storage_->copy(other.data_, data_);
147         } else {
148             data_ = nullptr;
149         }
150     }
151 
152     any_data& operator=(const any_data& other) {
153         if (this != &other) {
154             if (storage_) {
155                 storage_->destroy(data_);
156             }
157             storage_ = other.storage_ ? other.storage_->clone() : nullptr;
158             if (other.data_) {
159                 storage_->copy(other.data_, data_);
160             } else {
161                 data_ = nullptr;
162             }
163         }
164         return *this;
165     }
166 
any_data(any_data && other)167     any_data(any_data&& other)
168     : storage_(std::move(other.storage_))
169     {
170         if (other.data_) {
171             storage_->move(other.data_, data_);
172         } else {
173             data_ = nullptr;
174         }
175     }
176 
177     any_data& operator=(any_data&& other) {
178         if (this != &other) {
179             if (storage_) {
180                 storage_->destroy(data_);
181             }
182             storage_ = std::move(other.storage_);
183             if (other.data_) {
184                 storage_->move(other.data_, data_);
185             } else {
186                 data_ = nullptr;
187             }
188         }
189         return *this;
190     }
191 
~any_data()192     ~any_data() {
193         if (data_) {
194             storage_->destroy(data_);
195         }
196     }
197 
has_value()198     bool has_value() const noexcept {
199         return data_ != nullptr;
200     }
201 
202     template<typename T>
get()203     const T& get() const {
204         return *reinterpret_cast<const T*>(data_);
205     }
206 
207 private:
208     std::unique_ptr<transwarp::detail::storage> storage_;
209     void* data_;
210 };
211 
212 using str_view = const std::string&;
213 #else
214 using any_data = std::any;
215 using option_str = std::optional<std::string>;
216 using str_view = std::string_view;
217 #endif
218 
219 
220 /// The possible task types
221 enum class task_type {
222     root, ///< The task has no parents
223     accept, ///< The task's functor accepts all parent futures
224     accept_any, ///< The task's functor accepts the first parent future that becomes ready
225     consume, ///< The task's functor consumes all parent results
226     consume_any, ///< The task's functor consumes the first parent result that becomes ready
227     wait, ///< The task's functor takes no arguments but waits for all parents to finish
228     wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
229 };
230 
231 
232 /// Base class for exceptions
233 class transwarp_error : public std::runtime_error {
234 public:
transwarp_error(const std::string & message)235     explicit transwarp_error(const std::string& message)
236     : std::runtime_error{message}
237     {}
238 };
239 
240 
241 /// Exception thrown when a task is canceled
242 class task_canceled : public transwarp::transwarp_error {
243 public:
task_canceled(const std::string & task_repr)244     explicit task_canceled(const std::string& task_repr)
245     : transwarp::transwarp_error{"Task canceled: " + task_repr}
246     {}
247 };
248 
249 
250 /// Exception thrown when a task was destroyed prematurely
251 class task_destroyed : public transwarp::transwarp_error {
252 public:
task_destroyed(const std::string & task_repr)253     explicit task_destroyed(const std::string& task_repr)
254     : transwarp::transwarp_error{"Task destroyed: " + task_repr}
255     {}
256 };
257 
258 
259 /// Exception thrown when an invalid parameter was passed to a function
260 class invalid_parameter : public transwarp::transwarp_error {
261 public:
invalid_parameter(const std::string & parameter)262     explicit invalid_parameter(const std::string& parameter)
263     : transwarp::transwarp_error{"Invalid parameter: " + parameter}
264     {}
265 };
266 
267 
268 /// Exception thrown when a task is used in unintended ways
269 class control_error : public transwarp::transwarp_error {
270 public:
control_error(const std::string & message)271     explicit control_error(const std::string& message)
272     : transwarp::transwarp_error{"Control error: " + message}
273     {}
274 };
275 
276 
277 /// The root type. Used for tag dispatch
278 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
279 constexpr transwarp::root_type root{}; ///< The root task tag
280 
281 /// The accept type. Used for tag dispatch
282 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
283 constexpr transwarp::accept_type accept{}; ///< The accept task tag
284 
285 /// The accept_any type. Used for tag dispatch
286 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
287 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
288 
289 /// The consume type. Used for tag dispatch
290 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
291 constexpr transwarp::consume_type consume{}; ///< The consume task tag
292 
293 /// The consume_any type. Used for tag dispatch
294 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
295 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
296 
297 /// The wait type. Used for tag dispatch
298 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
299 constexpr transwarp::wait_type wait{}; ///< The wait task tag
300 
301 /// The wait_any type. Used for tag dispatch
302 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
303 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
304 
305 
306 class itask;
307 
308 
309 /// Detail namespace for internal functionality only
310 namespace detail {
311 
312 struct visit_visitor;
313 struct unvisit_visitor;
314 struct final_visitor;
315 struct schedule_visitor;
316 struct parent_visitor;
317 struct decrement_refcount_functor;
318 
319 } // detail
320 
321 
322 /// The executor interface used to perform custom task execution
323 class executor {
324 public:
325     virtual ~executor() = default;
326 
327     /// Returns the name of the executor
328     virtual std::string name() const = 0;
329 
330     /// Runs a task which is wrapped by the given functor. The functor only
331     /// captures one shared pointer and can hence be copied at low cost.
332     /// task represents the task that the functor belongs to.
333     /// This function is only ever called on the thread of the caller to schedule().
334     /// The implementer needs to ensure that this never throws exceptions
335     virtual void execute(const std::function<void()>& functor, transwarp::itask& task) = 0;
336 };
337 
338 
339 /// The task events that can be subscribed to using the listener interface
340 enum class event_type {
341     before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
342     after_future_changed, ///< Just after the task's future was changed (handle_event called on thread that changed the task's future)
343     before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
344     before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
345     after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
346     after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
347     after_satisfied, ///< Just after a task has satisfied all its children with results (handle_event called on thread where the last child is satisfied)
348     after_custom_data_set, ///< Just after custom data was assigned (handle_event called on thread that custom data was set on)
349     count,
350 };
351 
352 
353 /// The listener interface to listen to events raised by tasks
354 class listener {
355 public:
356     virtual ~listener() = default;
357 
358     /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
359     /// The implementer needs to ensure that this never throws exceptions.
360     virtual void handle_event(transwarp::event_type event, transwarp::itask& task) = 0;
361 };
362 
363 
364 /// An edge between two tasks
365 class edge {
366 public:
edge(transwarp::itask & parent,transwarp::itask & child)367     edge(transwarp::itask& parent, transwarp::itask& child) noexcept
368     : parent_(&parent), child_(&child)
369     {}
370 
371     // default copy/move semantics
372     edge(const edge&) = default;
373     edge& operator=(const edge&) = default;
374     edge(edge&&) = default;
375     edge& operator=(edge&&) = default;
376 
377     /// Returns the parent task
parent()378     const transwarp::itask& parent() const noexcept {
379         return *parent_;
380     }
381 
382     /// Returns the parent task
parent()383     transwarp::itask& parent() noexcept {
384         return *parent_;
385     }
386 
387     /// Returns the child task
child()388     const transwarp::itask& child() const noexcept {
389         return *child_;
390     }
391 
392     /// Returns the child task
child()393     transwarp::itask& child() noexcept {
394         return *child_;
395     }
396 
397 private:
398     transwarp::itask* parent_;
399     transwarp::itask* child_;
400 };
401 
402 
403 class timer;
404 class releaser;
405 
406 /// An interface for the task class
407 class itask : public std::enable_shared_from_this<itask> {
408 public:
409     virtual ~itask() = default;
410 
411     virtual void finalize() = 0;
412     virtual std::size_t id() const noexcept = 0;
413     virtual std::size_t level() const noexcept = 0;
414     virtual transwarp::task_type type() const noexcept = 0;
415     virtual const transwarp::option_str& name() const noexcept = 0;
416     virtual std::shared_ptr<transwarp::executor> executor() const noexcept = 0;
417     virtual std::int64_t priority() const noexcept = 0;
418     virtual const transwarp::any_data& custom_data() const noexcept = 0;
419     virtual bool canceled() const noexcept = 0;
420     virtual std::int64_t avg_idletime_us() const noexcept = 0;
421     virtual std::int64_t avg_waittime_us() const noexcept = 0;
422     virtual std::int64_t avg_runtime_us() const noexcept = 0;
423     virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
424     virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
425     virtual void remove_executor() = 0;
426     virtual void remove_executor_all() = 0;
427     virtual void set_priority(std::int64_t priority) = 0;
428     virtual void set_priority_all(std::int64_t priority) = 0;
429     virtual void reset_priority() = 0;
430     virtual void reset_priority_all() = 0;
431     virtual void set_custom_data(transwarp::any_data custom_data) = 0;
432     virtual void set_custom_data_all(transwarp::any_data custom_data) = 0;
433     virtual void remove_custom_data() = 0;
434     virtual void remove_custom_data_all() = 0;
435     virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
436     virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
437     virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
438     virtual void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
439     virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
440     virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
441     virtual void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) = 0;
442     virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
443     virtual void remove_listeners() = 0;
444     virtual void remove_listeners(transwarp::event_type event) = 0;
445     virtual void remove_listeners_all() = 0;
446     virtual void remove_listeners_all(transwarp::event_type event) = 0;
447     virtual void schedule() = 0;
448     virtual void schedule(transwarp::executor& executor) = 0;
449     virtual void schedule(bool reset) = 0;
450     virtual void schedule(transwarp::executor& executor, bool reset) = 0;
451     virtual void schedule_all() = 0;
452     virtual void schedule_all(transwarp::executor& executor) = 0;
453     virtual void schedule_all(bool reset_all) = 0;
454     virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
455     virtual void set_exception(std::exception_ptr exception) = 0;
456     virtual bool was_scheduled() const noexcept = 0;
457     virtual void wait() const = 0;
458     virtual bool is_ready() const = 0;
459     virtual bool has_result() const = 0;
460     virtual void reset() = 0;
461     virtual void reset_all() = 0;
462     virtual void cancel(bool enabled) noexcept = 0;
463     virtual void cancel_all(bool enabled) noexcept = 0;
464     virtual std::vector<itask*> parents() const = 0;
465     virtual const std::vector<itask*>& tasks() = 0;
466     virtual std::vector<transwarp::edge> edges() = 0;
467 
468 protected:
469     virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
470 
471 private:
472     friend struct transwarp::detail::visit_visitor;
473     friend struct transwarp::detail::unvisit_visitor;
474     friend struct transwarp::detail::final_visitor;
475     friend struct transwarp::detail::schedule_visitor;
476     friend struct transwarp::detail::parent_visitor;
477     friend class transwarp::timer;
478     friend class transwarp::releaser;
479     friend struct transwarp::detail::decrement_refcount_functor;
480 
481     virtual void visit(const std::function<void(itask&)>& visitor) = 0;
482     virtual void unvisit() noexcept = 0;
483     virtual void set_id(std::size_t id) noexcept = 0;
484     virtual void set_level(std::size_t level) noexcept = 0;
485     virtual void set_type(transwarp::task_type type) noexcept = 0;
486     virtual void set_name(transwarp::option_str name) noexcept = 0;
487     virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
488     virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
489     virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
490     virtual void increment_childcount() noexcept = 0;
491     virtual void decrement_refcount() = 0;
492     virtual void reset_future() = 0;
493 };
494 
495 
496 /// String conversion for the task_type enumeration
497 inline
to_string(const transwarp::task_type & type)498 std::string to_string(const transwarp::task_type& type) {
499     switch (type) {
500     case transwarp::task_type::root: return "root";
501     case transwarp::task_type::accept: return "accept";
502     case transwarp::task_type::accept_any: return "accept_any";
503     case transwarp::task_type::consume: return "consume";
504     case transwarp::task_type::consume_any: return "consume_any";
505     case transwarp::task_type::wait: return "wait";
506     case transwarp::task_type::wait_any: return "wait_any";
507     }
508     throw transwarp::invalid_parameter{"task type"};
509 }
510 
511 
512 /// String conversion for the itask class
513 inline
514 std::string to_string(const transwarp::itask& task, transwarp::str_view separator="\n") {
515     std::string s;
516     s += '"';
517     const transwarp::option_str& name = task.name();
518     if (name) {
519         s += std::string{"<"} + *name + std::string{">"} + separator.data();
520     }
521     s += transwarp::to_string(task.type());
522     s += std::string{" id="} + std::to_string(task.id());
523     s += std::string{" lev="} + std::to_string(task.level());
524     const std::shared_ptr<transwarp::executor> exec = task.executor();
525     if (exec) {
526         s += separator.data() + std::string{"<"} + exec->name() + std::string{">"};
527     }
528     const std::int64_t avg_idletime_us = task.avg_idletime_us();
529     if (avg_idletime_us >= 0) {
530         s += separator.data() + std::string{"avg-idle-us="} + std::to_string(avg_idletime_us);
531     }
532     const std::int64_t avg_waittime_us = task.avg_waittime_us();
533     if (avg_waittime_us >= 0) {
534         s += separator.data() + std::string{"avg-wait-us="} + std::to_string(avg_waittime_us);
535     }
536     const std::int64_t avg_runtime_us = task.avg_runtime_us();
537     if (avg_runtime_us >= 0) {
538         s += separator.data() + std::string{"avg-run-us="} + std::to_string(avg_runtime_us);
539     }
540     return s + '"';
541 }
542 
543 
544 /// String conversion for the edge class
545 inline
546 std::string to_string(const transwarp::edge& edge, transwarp::str_view separator="\n") {
547     return transwarp::to_string(edge.parent(), separator) + std::string{" -> "} + transwarp::to_string(edge.child(), separator);
548 }
549 
550 
551 /// Creates a dot-style string from the given edges
552 inline
553 std::string to_string(const std::vector<transwarp::edge>& edges, transwarp::str_view separator="\n") {
554     std::string dot = std::string{"digraph {"} + separator.data();
555     for (const transwarp::edge& edge : edges) {
556         dot += transwarp::to_string(edge, separator) + separator.data();
557     }
558     dot += std::string{"}"};
559     return dot;
560 }
561 
562 
563 /// Removes reference and const from a type
564 template<typename T>
565 struct decay {
566     using type = typename std::remove_const<typename std::remove_reference<T>::type>::type;
567 };
568 
569 
570 /// Returns the result type of a std::shared_future<T>
571 template<typename T>
572 struct result {
573     using type = decltype(std::declval<std::shared_future<T>>().get());
574 };
575 
576 
577 /// Detail namespace for internal functionality only
578 namespace detail {
579 
580 /// Clones a task
581 template<typename TaskType>
clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>,std::shared_ptr<transwarp::itask>> & task_cache,const std::shared_ptr<TaskType> & t)582 std::shared_ptr<TaskType> clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<TaskType>& t) {
583     const auto original_task = std::static_pointer_cast<transwarp::itask>(t);
584     const auto task_cache_it = task_cache.find(original_task);
585     if (task_cache_it != task_cache.cend()) {
586         return std::static_pointer_cast<TaskType>(task_cache_it->second);
587     } else {
588         auto cloned_task = t->clone_impl(task_cache);
589         task_cache[original_task] = cloned_task;
590         return cloned_task;
591     }
592 }
593 
594 } // detail
595 
596 
597 /// The task class
598 template<typename ResultType>
599 class task : public transwarp::itask {
600 public:
601     using result_type = ResultType;
602 
603     virtual ~task() = default;
604 
clone()605     std::shared_ptr<task> clone() const {
606         std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
607         return clone_impl(task_cache);
608     }
609 
610     virtual void set_value(const typename transwarp::decay<result_type>::type& value) = 0;
611     virtual void set_value(typename transwarp::decay<result_type>::type&& value) = 0;
612     virtual std::shared_future<result_type> future() const noexcept = 0;
613     virtual typename transwarp::result<result_type>::type get() const = 0;
614 
615 private:
616     template<typename T>
617     friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
618 
619     virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
620 };
621 
622 /// The task class (reference result type)
623 template<typename ResultType>
624 class task<ResultType&> : public transwarp::itask {
625 public:
626     using result_type = ResultType&;
627 
628     virtual ~task() = default;
629 
clone()630     std::shared_ptr<task> clone() const {
631         std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
632         return clone_impl(task_cache);
633     }
634 
635     virtual void set_value(typename transwarp::decay<result_type>::type& value) = 0;
636     virtual std::shared_future<result_type> future() const noexcept = 0;
637     virtual typename transwarp::result<result_type>::type get() const = 0;
638 
639 private:
640     template<typename T>
641     friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
642 
643     virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
644 };
645 
646 /// The task class (void result type)
647 template<>
648 class task<void> : public transwarp::itask {
649 public:
650     using result_type = void;
651 
652     virtual ~task() = default;
653 
clone()654     std::shared_ptr<task> clone() const {
655         std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
656         return clone_impl(task_cache);
657     }
658 
659     virtual void set_value() = 0;
660     virtual std::shared_future<result_type> future() const noexcept = 0;
661     virtual result_type get() const = 0;
662 
663 private:
664     template<typename T>
665     friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
666 
667     virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
668 };
669 
670 
671 /// Detail namespace for internal functionality only
672 namespace detail {
673 
674 template<bool>
675 struct assign_task_if_impl;
676 
677 } // detail
678 
679 
680 /// A base class for a user-defined functor that needs access to the associated
681 /// task or a cancel point to stop a task while it's running
682 class functor {
683 public:
684 
685     virtual ~functor() = default;
686 
687 protected:
688 
689     /// The associated task (only to be called after the task was constructed)
transwarp_task()690     const transwarp::itask& transwarp_task() const noexcept {
691         return *transwarp_task_;
692     }
693 
694     /// The associated task (only to be called after the task was constructed)
transwarp_task()695     transwarp::itask& transwarp_task() noexcept {
696         return *transwarp_task_;
697     }
698 
699     /// If the associated task is canceled then this will throw transwarp::task_canceled
700     /// which will stop the task while it's running (only to be called after the task was constructed)
transwarp_cancel_point()701     void transwarp_cancel_point() const {
702         if (transwarp_task_->canceled()) {
703             throw transwarp::task_canceled{std::to_string(transwarp_task_->id())};
704         }
705     }
706 
707 private:
708     template<bool>
709     friend struct transwarp::detail::assign_task_if_impl;
710 
711     transwarp::itask* transwarp_task_{};
712 };
713 
714 
715 /// Detail namespace for internal functionality only
716 namespace detail {
717 
718 
719 /// A simple thread pool used to execute tasks in parallel
720 class thread_pool {
721 public:
722 
723     explicit thread_pool(const std::size_t n_threads,
724                          std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
725     : on_thread_started_{std::move(on_thread_started)}
726     {
727         if (n_threads == 0) {
728             throw transwarp::invalid_parameter{"number of threads"};
729         }
730         for (std::size_t i = 0; i < n_threads; ++i) {
731             std::thread thread;
732             try {
733                 thread = std::thread(&thread_pool::worker, this, i);
catch(...)734             } catch (...) {
735                 shutdown();
736                 throw;
737             }
738             try {
739                 threads_.push_back(std::move(thread));
catch(...)740             } catch (...) {
741                 shutdown();
742                 thread.join();
743                 throw;
744             }
745         }
746     }
747 
748     // delete copy/move semantics
749     thread_pool(const thread_pool&) = delete;
750     thread_pool& operator=(const thread_pool&) = delete;
751     thread_pool(thread_pool&&) = delete;
752     thread_pool& operator=(thread_pool&&) = delete;
753 
~thread_pool()754     ~thread_pool() {
755         shutdown();
756     }
757 
push(const std::function<void ()> & functor)758     void push(const std::function<void()>& functor) {
759         {
760             std::lock_guard<std::mutex> lock{mutex_};
761             functors_.push(functor);
762         }
763         cond_var_.notify_one();
764     }
765 
766 private:
767 
worker(const std::size_t index)768     void worker(const std::size_t index) {
769         if (on_thread_started_) {
770             on_thread_started_(index);
771         }
772         for (;;) {
773             std::function<void()> functor;
774             {
775                 std::unique_lock<std::mutex> lock{mutex_};
776                 cond_var_.wait(lock, [this]{
777                     return done_ || !functors_.empty();
778                 });
779                 if (done_ && functors_.empty()) {
780                     break;
781                 }
782                 functor = std::move(functors_.front());
783                 functors_.pop();
784             }
785             functor();
786         }
787     }
788 
shutdown()789     void shutdown() {
790         {
791             std::lock_guard<std::mutex> lock{mutex_};
792             done_ = true;
793         }
794         cond_var_.notify_all();
795         for (std::thread& thread : threads_) {
796             thread.join();
797         }
798         threads_.clear();
799     }
800 
801     bool done_ = false;
802     std::function<void(std::size_t)> on_thread_started_;
803     std::vector<std::thread> threads_;
804     std::queue<std::function<void()>> functors_;
805     std::condition_variable cond_var_;
806     std::mutex mutex_;
807 };
808 
809 
810 #ifdef TRANSWARP_CPP11
811 template<std::size_t...> struct indices {};
812 
813 template<std::size_t...> struct construct_range;
814 
815 template<std::size_t end, std::size_t idx, std::size_t... i>
816 struct construct_range<end, idx, i...> : construct_range<end, idx + 1, i..., idx> {};
817 
818 template<std::size_t end, std::size_t... i>
819 struct construct_range<end, end, i...> {
820     using type = transwarp::detail::indices<i...>;
821 };
822 
823 template<std::size_t b, std::size_t e>
824 struct index_range {
825     using type = typename transwarp::detail::construct_range<e, b>::type;
826 };
827 
828 template<typename Functor, typename Tuple>
829 void call_with_each_index(transwarp::detail::indices<>, Functor&&, Tuple&&) {}
830 
831 template<std::size_t i, std::size_t... j, typename Functor, typename Tuple>
832 void call_with_each_index(transwarp::detail::indices<i, j...>, Functor&& f, Tuple&& t) {
833     f(std::get<i>(t));
834     transwarp::detail::call_with_each_index(transwarp::detail::indices<j...>{}, std::forward<Functor>(f), std::forward<Tuple>(t));
835 }
836 #endif
837 
838 template<typename Functor, typename Tuple>
839 void apply_to_each(Functor&& f, Tuple&& t) {
840 #ifdef TRANSWARP_CPP11
841     constexpr std::size_t n = std::tuple_size<typename std::decay<Tuple>::type>::value;
842     using index_t = typename transwarp::detail::index_range<0, n>::type;
843     transwarp::detail::call_with_each_index(index_t{}, std::forward<Functor>(f), std::forward<Tuple>(t));
844 #else
845     std::apply([&f](auto&&... arg){(..., std::forward<Functor>(f)(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
846 #endif
847 }
848 
849 template<typename Functor, typename ElementType>
850 void apply_to_each(Functor&& f, const std::vector<ElementType>& v) {
851     std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
852 }
853 
854 template<typename Functor, typename ElementType>
855 void apply_to_each(Functor&& f, std::vector<ElementType>& v) {
856     std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
857 }
858 
859 
860 template<int offset, typename... ParentResults>
861 struct assign_futures_impl {
862     static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
863         std::get<offset>(target) = std::get<offset>(source)->future();
864         assign_futures_impl<offset - 1, ParentResults...>::work(source, target);
865     }
866 };
867 
868 template<typename... ParentResults>
869 struct assign_futures_impl<-1, ParentResults...> {
870     static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
871 };
872 
873 /// Returns the futures from the given tuple of tasks
874 template<typename... ParentResults>
875 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
876     std::tuple<std::shared_future<ParentResults>...> result;
877     assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
878     return result;
879 }
880 
881 /// Returns the futures from the given vector of tasks
882 template<typename ParentResultType>
883 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
884     std::vector<std::shared_future<ParentResultType>> result;
885     result.reserve(input.size());
886     for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
887         result.emplace_back(task->future());
888     }
889     return result;
890 }
891 
892 
893 /// Runs the task with the given arguments, hence, invoking the task's functor
894 template<typename Result, typename Task, typename... Args>
895 Result run_task(std::size_t task_id, const std::weak_ptr<Task>& task, Args&&... args) {
896     const std::shared_ptr<Task> t = task.lock();
897     if (!t) {
898         throw transwarp::task_destroyed{std::to_string(task_id)};
899     }
900     if (t->canceled()) {
901         throw transwarp::task_canceled{std::to_string(task_id)};
902     }
903     t->raise_event(transwarp::event_type::before_invoked);
904     return (*t->functor_)(std::forward<Args>(args)...);
905 }
906 
907 
908 struct wait_for_all_functor {
909     template<typename T>
910     void operator()(const T& p) const {
911         p->future().wait();
912     }
913 };
914 
915 /// Waits for all parents to finish
916 template<typename... ParentResults>
917 void wait_for_all(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
918     transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents);
919 }
920 
921 /// Waits for all parents to finish
922 template<typename ParentResultType>
923 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
924     transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents);
925 }
926 
927 
928 template<typename Parent>
929 Parent wait_for_any_impl() {
930     return {};
931 }
932 
933 template<typename Parent, typename ParentResult, typename... ParentResults>
934 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
935     const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
936     if (status == std::future_status::ready) {
937         return parent;
938     }
939     return transwarp::detail::wait_for_any_impl<Parent>(parents...);
940 }
941 
942 /// Waits for the first parent to finish
943 template<typename Parent, typename... ParentResults>
944 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
945     for (;;) {
946         Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
947         if (parent) {
948             return parent;
949         }
950     }
951 }
952 
953 
954 /// Waits for the first parent to finish
955 template<typename ParentResultType>
956 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
957     for (;;) {
958         for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
959             const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
960             if (status == std::future_status::ready) {
961                 return parent;
962             }
963         }
964     }
965 }
966 
967 
968 template<typename OneResult>
969 struct cancel_all_but_one_functor {
970     explicit cancel_all_but_one_functor(const std::shared_ptr<transwarp::task<OneResult>>& one) noexcept
971     : one_(one) {}
972 
973     template<typename T>
974     void operator()(const T& parent) const {
975         if (one_ != parent) {
976             parent->cancel(true);
977         }
978     }
979 
980     const std::shared_ptr<transwarp::task<OneResult>>& one_;
981 };
982 
983 /// Cancels all tasks but one
984 template<typename OneResult, typename... ParentResults>
985 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
986     transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor<OneResult>{one}, parents);
987 }
988 
989 /// Cancels all tasks but one
990 template<typename OneResult, typename ParentResultType>
991 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
992     transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor<OneResult>{one}, parents);
993 }
994 
995 
996 struct decrement_refcount_functor {
997     template<typename T>
998     void operator()(const T& task) const {
999         task->decrement_refcount();
1000     }
1001 };
1002 
1003 /// Decrements the refcount of all parents
1004 template<typename... ParentResults>
1005 void decrement_refcount(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1006     transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents);
1007 }
1008 
1009 /// Decrements the refcount of all parents
1010 template<typename ParentResultType>
1011 void decrement_refcount(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1012     transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents);
1013 }
1014 
1015 
1016 template<typename TaskType, bool done, int total, int... n>
1017 struct call_impl {
1018     template<typename Result, typename Task, typename... ParentResults>
1019     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1020         return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
1021                 work<Result>(task_id, task, parents);
1022     }
1023 };
1024 
1025 template<typename TaskType>
1026 struct call_impl_vector;
1027 
1028 template<int total, int... n>
1029 struct call_impl<transwarp::root_type, true, total, n...> {
1030     template<typename Result, typename Task, typename... ParentResults>
1031     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
1032         return transwarp::detail::run_task<Result>(task_id, task);
1033     }
1034 };
1035 
1036 template<>
1037 struct call_impl_vector<transwarp::root_type> {
1038     template<typename Result, typename Task, typename ParentResultType>
1039     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
1040         return transwarp::detail::run_task<Result>(task_id, task);
1041     }
1042 };
1043 
1044 template<int total, int... n>
1045 struct call_impl<transwarp::accept_type, true, total, n...> {
1046     template<typename Result, typename Task, typename... ParentResults>
1047     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1048         transwarp::detail::wait_for_all(parents);
1049         const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1050         transwarp::detail::decrement_refcount(parents);
1051         return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
1052     }
1053 };
1054 
1055 template<>
1056 struct call_impl_vector<transwarp::accept_type> {
1057     template<typename Result, typename Task, typename ParentResultType>
1058     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1059         transwarp::detail::wait_for_all(parents);
1060         std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1061         transwarp::detail::decrement_refcount(parents);
1062         return transwarp::detail::run_task<Result>(task_id, task, std::move(futures));
1063     }
1064 };
1065 
1066 template<int total, int... n>
1067 struct call_impl<transwarp::accept_any_type, true, total, n...> {
1068     template<typename Result, typename Task, typename... ParentResults>
1069     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1070         using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
1071         parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1072         transwarp::detail::cancel_all_but_one(parent, parents);
1073         auto future = parent->future();
1074         transwarp::detail::decrement_refcount(parents);
1075         return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
1076     }
1077 };
1078 
1079 template<>
1080 struct call_impl_vector<transwarp::accept_any_type> {
1081     template<typename Result, typename Task, typename ParentResultType>
1082     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1083         std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1084         transwarp::detail::cancel_all_but_one(parent, parents);
1085         auto future = parent->future();
1086         transwarp::detail::decrement_refcount(parents);
1087         return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
1088     }
1089 };
1090 
1091 template<int total, int... n>
1092 struct call_impl<transwarp::consume_type, true, total, n...> {
1093     template<typename Result, typename Task, typename... ParentResults>
1094     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1095         transwarp::detail::wait_for_all(parents);
1096         const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1097         transwarp::detail::decrement_refcount(parents);
1098         return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures).get()...);
1099     }
1100 };
1101 
1102 template<>
1103 struct call_impl_vector<transwarp::consume_type> {
1104     template<typename Result, typename Task, typename ParentResultType>
1105     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1106         transwarp::detail::wait_for_all(parents);
1107         const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1108         transwarp::detail::decrement_refcount(parents);
1109         std::vector<ParentResultType> results;
1110         results.reserve(futures.size());
1111         for (const std::shared_future<ParentResultType>& future : futures) {
1112             results.emplace_back(future.get());
1113         }
1114         return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
1115     }
1116 };
1117 
1118 template<int total, int... n>
1119 struct call_impl<transwarp::consume_any_type, true, total, n...> {
1120     template<typename Result, typename Task, typename... ParentResults>
1121     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1122         using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; /// Use first type as reference
1123         parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1124         transwarp::detail::cancel_all_but_one(parent, parents);
1125         const auto future = parent->future();
1126         transwarp::detail::decrement_refcount(parents);
1127         return transwarp::detail::run_task<Result>(task_id, task, future.get());
1128     }
1129 };
1130 
1131 template<>
1132 struct call_impl_vector<transwarp::consume_any_type> {
1133     template<typename Result, typename Task, typename ParentResultType>
1134     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1135         std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1136         transwarp::detail::cancel_all_but_one(parent, parents);
1137         const auto future = parent->future();
1138         transwarp::detail::decrement_refcount(parents);
1139         return transwarp::detail::run_task<Result>(task_id, task, future.get());
1140     }
1141 };
1142 
1143 struct future_get_functor {
1144     template<typename T>
1145     void operator()(const std::shared_future<T>& f) const {
1146         f.get();
1147     }
1148 };
1149 
1150 template<int total, int... n>
1151 struct call_impl<transwarp::wait_type, true, total, n...> {
1152     template<typename Result, typename Task, typename... ParentResults>
1153     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1154         transwarp::detail::wait_for_all(parents);
1155         const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1156         transwarp::detail::decrement_refcount(parents);
1157         transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated
1158         return transwarp::detail::run_task<Result>(task_id, task);
1159     }
1160 };
1161 
1162 template<>
1163 struct call_impl_vector<transwarp::wait_type> {
1164     template<typename Result, typename Task, typename ParentResultType>
1165     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1166         transwarp::detail::wait_for_all(parents);
1167         const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1168         transwarp::detail::decrement_refcount(parents);
1169         transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated
1170         return transwarp::detail::run_task<Result>(task_id, task);
1171     }
1172 };
1173 
1174 template<int total, int... n>
1175 struct call_impl<transwarp::wait_any_type, true, total, n...> {
1176     template<typename Result, typename Task, typename... ParentResults>
1177     static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1178         using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
1179         parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1180         transwarp::detail::cancel_all_but_one(parent, parents);
1181         const auto future = parent->future();
1182         transwarp::detail::decrement_refcount(parents);
1183         future.get(); // Ensures that exceptions are propagated
1184         return transwarp::detail::run_task<Result>(task_id, task);
1185     }
1186 };
1187 
1188 template<>
1189 struct call_impl_vector<transwarp::wait_any_type> {
1190     template<typename Result, typename Task, typename ParentResultType>
1191     static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1192         std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1193         transwarp::detail::cancel_all_but_one(parent, parents);
1194         const auto future = parent->future();
1195         transwarp::detail::decrement_refcount(parents);
1196         future.get(); // Ensures that exceptions are propagated
1197         return transwarp::detail::run_task<Result>(task_id, task);
1198     }
1199 };
1200 
1201 /// Calls the functor of the given task with the results from the tuple of parents.
1202 /// Throws transwarp::task_canceled if the task is canceled.
1203 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1204 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
1205 Result call(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1206     constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
1207     return transwarp::detail::call_impl<TaskType, 0 == n, static_cast<int>(n)>::template
1208             work<Result>(task_id, task, parents);
1209 }
1210 
1211 /// Calls the functor of the given task with the results from the vector of parents.
1212 /// Throws transwarp::task_canceled if the task is canceled.
1213 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1214 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
1215 Result call(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1216     return transwarp::detail::call_impl_vector<TaskType>::template
1217             work<Result>(task_id, task, parents);
1218 }
1219 
1220 
1221 template<typename Functor>
1222 struct call_with_each_functor {
1223     explicit call_with_each_functor(const Functor& f) noexcept
1224     : f_(f) {}
1225 
1226     template<typename T>
1227     void operator()(const T& task) const {
1228         if (!task) {
1229             throw transwarp::invalid_parameter{"task pointer"};
1230         }
1231         f_(*task);
1232     }
1233 
1234     const Functor& f_;
1235 };
1236 
1237 /// Calls the functor with every element in the tuple
1238 template<typename Functor, typename... ParentResults>
1239 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
1240     transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor<Functor>{f}, t);
1241 }
1242 
1243 /// Calls the functor with every element in the vector
1244 template<typename Functor, typename ParentResultType>
1245 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
1246     transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor<Functor>{f}, v);
1247 }
1248 
1249 
1250 /// Sets level of a task and increments the child count
1251 struct parent_visitor {
1252     explicit parent_visitor(transwarp::itask& task) noexcept
1253     : task_(task) {}
1254 
1255     void operator()(transwarp::itask& task) const {
1256         if (task_.level() <= task.level()) {
1257             // A child's level is always larger than any of its parents' levels
1258             task_.set_level(task.level() + 1);
1259         }
1260         task.increment_childcount();
1261     }
1262 
1263     transwarp::itask& task_;
1264 };
1265 
1266 /// Applies final bookkeeping to the task and collects the task
1267 struct final_visitor {
1268     explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
1269     : tasks_(tasks) {}
1270 
1271     void operator()(transwarp::itask& task) noexcept {
1272         tasks_.push_back(&task);
1273         task.set_id(id_++);
1274     }
1275 
1276     std::vector<transwarp::itask*>& tasks_;
1277     std::size_t id_ = 0;
1278 };
1279 
1280 /// Generates edges
1281 struct edges_visitor {
1282     explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
1283     : edges_(edges) {}
1284 
1285     void operator()(transwarp::itask& task) {
1286         for (transwarp::itask* parent : task.parents()) {
1287             edges_.emplace_back(*parent, task);
1288         }
1289     }
1290 
1291     std::vector<transwarp::edge>& edges_;
1292 };
1293 
1294 /// Schedules using the given executor
1295 struct schedule_visitor {
1296     schedule_visitor(bool reset, transwarp::executor* executor) noexcept
1297     : reset_(reset), executor_(executor) {}
1298 
1299     void operator()(transwarp::itask& task) {
1300         task.schedule_impl(reset_, executor_);
1301     }
1302 
1303     bool reset_;
1304     transwarp::executor* executor_;
1305 };
1306 
1307 /// Resets the given task
1308 struct reset_visitor {
1309 
1310     void operator()(transwarp::itask& task) const {
1311         task.reset();
1312     }
1313 };
1314 
1315 /// Cancels or resumes the given task
1316 struct cancel_visitor {
1317     explicit cancel_visitor(bool enabled) noexcept
1318     : enabled_{enabled} {}
1319 
1320     void operator()(transwarp::itask& task) const noexcept {
1321         task.cancel(enabled_);
1322     }
1323 
1324     bool enabled_;
1325 };
1326 
1327 /// Assigns an executor to the given task
1328 struct set_executor_visitor {
1329     explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1330     : executor_{std::move(executor)} {}
1331 
1332     void operator()(transwarp::itask& task) const noexcept {
1333         task.set_executor(executor_);
1334     }
1335 
1336     std::shared_ptr<transwarp::executor> executor_;
1337 };
1338 
1339 /// Removes the executor from the given task
1340 struct remove_executor_visitor {
1341 
1342     void operator()(transwarp::itask& task) const noexcept {
1343         task.remove_executor();
1344     }
1345 };
1346 
1347 /// Assigns a priority to the given task
1348 struct set_priority_visitor {
1349     explicit set_priority_visitor(std::int64_t priority) noexcept
1350     : priority_{priority} {}
1351 
1352     void operator()(transwarp::itask& task) const noexcept {
1353         task.set_priority(priority_);
1354     }
1355 
1356     std::int64_t priority_;
1357 };
1358 
1359 /// Resets the priority of the given task
1360 struct reset_priority_visitor {
1361 
1362     void operator()(transwarp::itask& task) const noexcept {
1363         task.reset_priority();
1364     }
1365 };
1366 
1367 /// Assigns custom data to the given task
1368 struct set_custom_data_visitor {
1369     explicit set_custom_data_visitor(transwarp::any_data custom_data) noexcept
1370     : custom_data_{std::move(custom_data)} {}
1371 
1372     void operator()(transwarp::itask& task) const noexcept {
1373         task.set_custom_data(custom_data_);
1374     }
1375 
1376     transwarp::any_data custom_data_;
1377 };
1378 
1379 /// Removes custom data from the given task
1380 struct remove_custom_data_visitor {
1381 
1382     void operator()(transwarp::itask& task) const noexcept {
1383         task.remove_custom_data();
1384     }
1385 };
1386 
1387 /// Pushes the given task into the vector of tasks
1388 struct push_task_visitor {
1389     explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1390     : tasks_(tasks) {}
1391 
1392     void operator()(transwarp::itask& task) {
1393         tasks_.push_back(&task);
1394     }
1395 
1396     std::vector<transwarp::itask*>& tasks_;
1397 };
1398 
1399 /// Adds a new listener to the given task
1400 struct add_listener_visitor {
1401     explicit add_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1402     : listener_(std::move(listener))
1403     {}
1404 
1405     void operator()(transwarp::itask& task) {
1406         task.add_listener(listener_);
1407     }
1408 
1409     std::shared_ptr<transwarp::listener> listener_;
1410 };
1411 
1412 /// Adds a new listener per event type to the given task
1413 struct add_listener_per_event_visitor {
1414     add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1415     : event_(event), listener_(std::move(listener))
1416     {}
1417 
1418     void operator()(transwarp::itask& task) {
1419         task.add_listener(event_, listener_);
1420     }
1421 
1422     transwarp::event_type event_;
1423     std::shared_ptr<transwarp::listener> listener_;
1424 };
1425 
1426 /// Removes a listener from the given task
1427 struct remove_listener_visitor {
1428     explicit remove_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1429     : listener_(std::move(listener))
1430     {}
1431 
1432     void operator()(transwarp::itask& task) {
1433         task.remove_listener(listener_);
1434     }
1435 
1436     std::shared_ptr<transwarp::listener> listener_;
1437 };
1438 
1439 /// Removes a listener per event type from the given task
1440 struct remove_listener_per_event_visitor {
1441     remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1442     : event_(event), listener_(std::move(listener))
1443     {}
1444 
1445     void operator()(transwarp::itask& task) {
1446         task.remove_listener(event_, listener_);
1447     }
1448 
1449     transwarp::event_type event_;
1450     std::shared_ptr<transwarp::listener> listener_;
1451 };
1452 
1453 /// Removes all listeners from the given task
1454 struct remove_listeners_visitor {
1455 
1456     void operator()(transwarp::itask& task) {
1457         task.remove_listeners();
1458     }
1459 
1460 };
1461 
1462 /// Removes all listeners per event type from the given task
1463 struct remove_listeners_per_event_visitor {
1464     explicit remove_listeners_per_event_visitor(transwarp::event_type event)
1465     : event_(event)
1466     {}
1467 
1468     void operator()(transwarp::itask& task) {
1469         task.remove_listeners(event_);
1470     }
1471 
1472     transwarp::event_type event_;
1473 };
1474 
1475 /// Visits the given task using the visitor given in the constructor
1476 struct visit_visitor {
1477     explicit visit_visitor(const std::function<void(transwarp::itask&)>& visitor) noexcept
1478     : visitor_(visitor) {}
1479 
1480     void operator()(transwarp::itask& task) const {
1481         task.visit(visitor_);
1482     }
1483 
1484     const std::function<void(transwarp::itask&)>& visitor_;
1485 };
1486 
1487 /// Unvisits the given task
1488 struct unvisit_visitor {
1489 
1490     void operator()(transwarp::itask& task) const noexcept {
1491         task.unvisit();
1492     }
1493 };
1494 
1495 /// Determines the result type of the Functor dispatching on the task type
1496 template<typename TaskType, typename Functor, typename... ParentResults>
1497 struct functor_result {
1498     static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1499                   std::is_same<TaskType, transwarp::accept_type>::value ||
1500                   std::is_same<TaskType, transwarp::accept_any_type>::value ||
1501                   std::is_same<TaskType, transwarp::consume_type>::value ||
1502                   std::is_same<TaskType, transwarp::consume_any_type>::value ||
1503                   std::is_same<TaskType, transwarp::wait_type>::value ||
1504                   std::is_same<TaskType, transwarp::wait_any_type>::value,
1505                   "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1506 };
1507 
1508 template<typename Functor, typename... ParentResults>
1509 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1510     static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1511     using type = decltype(std::declval<Functor>()());
1512 };
1513 
1514 template<typename Functor, typename... ParentResults>
1515 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1516     static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1517     using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1518 };
1519 
1520 template<typename Functor, typename ParentResultType>
1521 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1522     using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1523 };
1524 
1525 template<typename Functor, typename... ParentResults>
1526 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1527     static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1528     using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1529     using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1530 };
1531 
1532 template<typename Functor, typename ParentResultType>
1533 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1534     using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1535 };
1536 
1537 template<typename Functor, typename... ParentResults>
1538 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1539     static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1540     using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1541 };
1542 
1543 template<typename Functor, typename ParentResultType>
1544 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1545     using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1546 };
1547 
1548 template<typename Functor, typename... ParentResults>
1549 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1550     static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1551     using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1552     using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1553 };
1554 
1555 template<typename Functor, typename ParentResultType>
1556 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1557     using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1558 };
1559 
1560 template<typename Functor, typename... ParentResults>
1561 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1562     static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1563     using type = decltype(std::declval<Functor>()());
1564 };
1565 
1566 template<typename Functor, typename ParentResultType>
1567 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1568     using type = decltype(std::declval<Functor>()());
1569 };
1570 
1571 template<typename Functor, typename... ParentResults>
1572 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1573     static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1574     using type = decltype(std::declval<Functor>()());
1575 };
1576 
1577 template<typename Functor, typename ParentResultType>
1578 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1579     using type = decltype(std::declval<Functor>()());
1580 };
1581 
1582 
1583 template<bool is_transwarp_functor>
1584 struct assign_task_if_impl;
1585 
1586 template<>
1587 struct assign_task_if_impl<false> {
1588     template<typename Functor>
1589     void operator()(Functor&, transwarp::itask&) const noexcept {}
1590 };
1591 
1592 template<>
1593 struct assign_task_if_impl<true> {
1594     template<typename Functor>
1595     void operator()(Functor& functor, transwarp::itask& task) const noexcept {
1596         functor.transwarp_task_ = &task;
1597     }
1598 };
1599 
1600 /// Assigns the task to the given functor if the functor is a subclass of transwarp::functor
1601 template<typename Functor>
1602 void assign_task_if(Functor& functor, transwarp::itask& task) noexcept {
1603     transwarp::detail::assign_task_if_impl<std::is_base_of<transwarp::functor, Functor>::value>{}(functor, task);
1604 }
1605 
1606 
1607 /// Returns a ready future with the given value as its state
1608 template<typename ResultType, typename Value>
1609 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1610     std::promise<ResultType> promise;
1611     promise.set_value(std::forward<Value>(value));
1612     return promise.get_future();
1613 }
1614 
1615 /// Returns a ready future
1616 inline
1617 std::shared_future<void> make_ready_future() {
1618     std::promise<void> promise;
1619     promise.set_value();
1620     return promise.get_future();
1621 }
1622 
1623 /// Returns a ready future with the given exception as its state
1624 template<typename ResultType>
1625 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1626     if (!exception) {
1627         throw transwarp::invalid_parameter{"exception pointer"};
1628     }
1629     std::promise<ResultType> promise;
1630     promise.set_exception(exception);
1631     return promise.get_future();
1632 }
1633 
1634 
1635 struct clone_task_functor {
1636     explicit clone_task_functor(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) noexcept
1637     : task_cache_(task_cache) {}
1638 
1639     template<typename T>
1640     void operator()(T& t) {
1641         t = transwarp::detail::clone_task(task_cache_, t);
1642     }
1643 
1644     std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache_;
1645 };
1646 
1647 
1648 struct push_task_functor {
1649     explicit push_task_functor(std::vector<transwarp::itask*>& tasks) noexcept
1650     : tasks_(tasks) {}
1651 
1652     template<typename T>
1653     void operator()(T& t) {
1654         tasks_.push_back(t.get());
1655     }
1656 
1657     std::vector<transwarp::itask*>& tasks_;
1658 };
1659 
1660 
1661 /// Determines the type of the parents
1662 template<typename... ParentResults>
1663 struct parents {
1664     using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1665     static std::size_t size(const type&) {
1666         return std::tuple_size<type>::value;
1667     }
1668     static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1669         type cloned = obj;
1670         transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned);
1671         return cloned;
1672     }
1673     static std::vector<transwarp::itask*> tasks(const type& parents) {
1674         std::vector<transwarp::itask*> tasks;
1675         transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents);
1676         return tasks;
1677     }
1678 };
1679 
1680 /// Determines the type of the parents. Specialization for vector parents
1681 template<typename ParentResultType>
1682 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1683     using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1684     static std::size_t size(const type& obj) {
1685         return obj.size();
1686     }
1687     static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1688         type cloned = obj;
1689         transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned);
1690         return cloned;
1691     }
1692     static std::vector<transwarp::itask*> tasks(const type& parents) {
1693         std::vector<transwarp::itask*> tasks;
1694         transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents);
1695         return tasks;
1696     }
1697 };
1698 
1699 
1700 template<typename ResultType, typename TaskType>
1701 class base_runner {
1702 protected:
1703 
1704     template<typename Task, typename Parents>
1705     void call(std::size_t task_id,
1706               const std::weak_ptr<Task>& task,
1707               const Parents& parents) {
1708         promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1709     }
1710 
1711     std::promise<ResultType> promise_;
1712 };
1713 
1714 template<typename TaskType>
1715 class base_runner<void, TaskType> {
1716 protected:
1717 
1718     template<typename Task, typename Parents>
1719     void call(std::size_t task_id,
1720               const std::weak_ptr<Task>& task,
1721               const Parents& parents) {
1722         transwarp::detail::call<TaskType, void>(task_id, task, parents);
1723         promise_.set_value();
1724     }
1725 
1726     std::promise<void> promise_;
1727 };
1728 
1729 /// A callable to run a task given its parents
1730 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1731 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1732 public:
1733 
1734     runner(std::size_t task_id,
1735            const std::weak_ptr<Task>& task,
1736            const typename transwarp::decay<Parents>::type& parents)
1737     : task_id_(task_id),
1738       task_(task),
1739       parents_(parents)
1740     {}
1741 
1742     std::future<ResultType> future() {
1743         return this->promise_.get_future();
1744     }
1745 
1746     void operator()() {
1747         if (const std::shared_ptr<Task> t = task_.lock()) {
1748             t->raise_event(transwarp::event_type::before_started);
1749         }
1750         try {
1751             this->call(task_id_, task_, parents_);
1752         } catch (const transwarp::task_canceled&) {
1753             this->promise_.set_exception(std::current_exception());
1754             if (const std::shared_ptr<Task> t = task_.lock()) {
1755                 t->raise_event(transwarp::event_type::after_canceled);
1756             }
1757         } catch (...) {
1758             this->promise_.set_exception(std::current_exception());
1759         }
1760         if (const std::shared_ptr<Task> t = task_.lock()) {
1761             t->raise_event(transwarp::event_type::after_finished);
1762         }
1763     }
1764 
1765 private:
1766     const std::size_t task_id_;
1767     const std::weak_ptr<Task> task_;
1768     const typename transwarp::decay<Parents>::type parents_;
1769 };
1770 
1771 
1772 /// A simple circular buffer (FIFO).
1773 /// ValueType must support default construction. The buffer lets you push
1774 /// new values onto the back and pop old values off the front.
1775 template<typename ValueType>
1776 class circular_buffer {
1777 public:
1778 
1779     static_assert(std::is_default_constructible<ValueType>::value, "ValueType must be default constructible");
1780 
1781     using value_type = ValueType;
1782 
1783     /// Constructs a circular buffer with a given fixed capacity
1784     explicit
1785     circular_buffer(std::size_t capacity)
1786     : data_(capacity)
1787     {
1788         if (capacity < 1) {
1789             throw transwarp::invalid_parameter{"capacity"};
1790         }
1791     }
1792 
1793     // delete copy/move semantics
1794     circular_buffer(const circular_buffer&) = delete;
1795     circular_buffer& operator=(const circular_buffer&) = delete;
1796     circular_buffer(circular_buffer&& other) = delete;
1797     circular_buffer& operator=(circular_buffer&&) = delete;
1798 
1799     /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1800     /// of the buffer then the oldest value gets dropped (the one at the front).
1801     template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1802     void push(T&& value) {
1803         data_[end_] = std::forward<T>(value);
1804         increment();
1805     }
1806 
1807     /// Returns the value at the front of the buffer (the oldest value).
1808     /// This is undefined if the buffer is empty
1809     const value_type& front() const {
1810         return data_[front_];
1811     }
1812 
1813     /// Removes the value at the front of the buffer (the oldest value)
1814     void pop() {
1815         if (!empty()) {
1816             data_[front_] = ValueType{};
1817             decrement();
1818         }
1819     }
1820 
1821     /// Returns the capacity of the buffer
1822     std::size_t capacity() const {
1823         return data_.size();
1824     }
1825 
1826     /// Returns the number of populated values of the buffer. Its maximum value
1827     /// equals the capacity of the buffer
1828     std::size_t size() const {
1829         return size_;
1830     }
1831 
1832     /// Returns whether the buffer is empty
1833     bool empty() const {
1834         return size_ == 0;
1835     }
1836 
1837     /// Returns whether the buffer is full
1838     bool full() const {
1839         return size_ == data_.size();
1840     }
1841 
1842     /// Swaps this buffer with the given buffer
1843     void swap(circular_buffer& buffer) {
1844         std::swap(end_, buffer.end_);
1845         std::swap(front_, buffer.front_);
1846         std::swap(size_, buffer.size_);
1847         std::swap(data_, buffer.data_);
1848     }
1849 
1850 private:
1851 
1852     void increment_or_wrap(std::size_t& value) const {
1853         if (value == data_.size() - 1) {
1854             value = 0;
1855         } else {
1856             ++value;
1857         }
1858     }
1859 
1860     void increment() {
1861         increment_or_wrap(end_);
1862         if (full()) {
1863             increment_or_wrap(front_);
1864         } else {
1865             ++size_;
1866         }
1867     }
1868 
1869     void decrement() {
1870         increment_or_wrap(front_);
1871         --size_;
1872     }
1873 
1874     std::size_t end_{};
1875     std::size_t front_{};
1876     std::size_t size_{};
1877     std::vector<value_type> data_;
1878 };
1879 
1880 
1881 class spinlock {
1882 public:
1883 
1884     void lock() noexcept {
1885         while (locked_.test_and_set(std::memory_order_acquire));
1886     }
1887 
1888     void unlock() noexcept {
1889         locked_.clear(std::memory_order_release);
1890     }
1891 
1892 private:
1893     std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1894 };
1895 
1896 
1897 } // detail
1898 
1899 
1900 /// A functor not doing nothing
1901 struct no_op_functor {
1902     void operator()() const noexcept {}
1903 };
1904 
1905 /// An object to use in places where a no-op functor is required
1906 constexpr no_op_functor no_op{};
1907 
1908 
1909 /// Executor for sequential execution. Runs functors sequentially on the same thread
1910 class sequential : public transwarp::executor {
1911 public:
1912 
1913     sequential() = default;
1914 
1915     // delete copy/move semantics
1916     sequential(const sequential&) = delete;
1917     sequential& operator=(const sequential&) = delete;
1918     sequential(sequential&&) = delete;
1919     sequential& operator=(sequential&&) = delete;
1920 
1921     /// Returns the name of the executor
1922     std::string name() const override {
1923         return "transwarp::sequential";
1924     }
1925 
1926     /// Runs the functor on the current thread
1927     void execute(const std::function<void()>& functor, transwarp::itask&) override {
1928         functor();
1929     }
1930 };
1931 
1932 
1933 /// Executor for parallel execution. Uses a simple thread pool
1934 class parallel : public transwarp::executor {
1935 public:
1936 
1937     explicit parallel(const std::size_t n_threads,
1938                       std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
1939     : pool_{n_threads, std::move(on_thread_started)}
1940     {}
1941 
1942     // delete copy/move semantics
1943     parallel(const parallel&) = delete;
1944     parallel& operator=(const parallel&) = delete;
1945     parallel(parallel&&) = delete;
1946     parallel& operator=(parallel&&) = delete;
1947 
1948     /// Returns the name of the executor
1949     std::string name() const override {
1950         return "transwarp::parallel";
1951     }
1952 
1953     /// Pushes the functor into the thread pool for asynchronous execution
1954     void execute(const std::function<void()>& functor, transwarp::itask&) override {
1955         pool_.push(functor);
1956     }
1957 
1958 private:
1959     transwarp::detail::thread_pool pool_;
1960 };
1961 
1962 
1963 /// Detail namespace for internal functionality only
1964 namespace detail {
1965 
1966 const transwarp::option_str nullopt_string;
1967 const transwarp::any_data any_empty;
1968 
1969 
1970 template<typename ResultType, bool is_void>
1971 struct make_future_functor;
1972 
1973 template<typename ResultType>
1974 struct make_future_functor<ResultType, true> {
1975     template<typename Future, typename OtherFuture>
1976     void operator()(Future& future, const OtherFuture& other) const {
1977         other.get();
1978         future = transwarp::detail::make_ready_future();
1979     }
1980 };
1981 
1982 template<typename ResultType>
1983 struct make_future_functor<ResultType, false> {
1984     template<typename Future, typename OtherFuture>
1985     void operator()(Future& future, const OtherFuture& other) const {
1986         future = transwarp::detail::make_future_with_value<ResultType>(other.get());
1987     }
1988 };
1989 
1990 
1991 /// Common task functionality shared across `task_impl` and `value_task`
1992 template<typename ResultType>
1993 class task_common : public transwarp::task<ResultType> {
1994 public:
1995     /// The result type of this task
1996     using result_type = ResultType;
1997 
1998     /// The task's id
1999     std::size_t id() const noexcept override {
2000         return id_;
2001     }
2002 
2003     /// The optional task name
2004     const transwarp::option_str& name() const noexcept override {
2005 #ifndef TRANSWARP_DISABLE_TASK_NAME
2006         return name_;
2007 #else
2008         return transwarp::detail::nullopt_string;
2009 #endif
2010     }
2011 
2012     /// The task priority (defaults to 0)
2013     std::int64_t priority() const noexcept override {
2014 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2015         return priority_;
2016 #else
2017         return 0;
2018 #endif
2019     }
2020 
2021     /// The custom task data (may not hold a value)
2022     const transwarp::any_data& custom_data() const noexcept override {
2023 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2024         return custom_data_;
2025 #else
2026         return transwarp::detail::any_empty;
2027 #endif
2028     }
2029 
2030     /// Sets a task priority (defaults to 0). transwarp will not directly use this.
2031     /// This is only useful if something else is using the priority (e.g. a custom executor)
2032     void set_priority(std::int64_t priority) override {
2033 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2034         ensure_task_not_running();
2035         priority_ = priority;
2036 #else
2037         (void)priority;
2038 #endif
2039     }
2040 
2041     /// Resets the task priority to 0
2042     void reset_priority() override {
2043 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2044         ensure_task_not_running();
2045         priority_ = 0;
2046 #endif
2047     }
2048 
2049     /// Assigns custom data to this task. transwarp will not directly use this.
2050     /// This is only useful if something else is using this custom data (e.g. a custom executor)
2051     void set_custom_data(transwarp::any_data custom_data) override {
2052 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2053         ensure_task_not_running();
2054         if (!custom_data.has_value()) {
2055             throw transwarp::invalid_parameter{"custom data"};
2056         }
2057         custom_data_ = std::move(custom_data);
2058         raise_event(transwarp::event_type::after_custom_data_set);
2059 #else
2060         (void)custom_data;
2061 #endif
2062     }
2063 
2064     /// Removes custom data from this task
2065     void remove_custom_data() override {
2066 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2067         ensure_task_not_running();
2068         custom_data_ = {};
2069         raise_event(transwarp::event_type::after_custom_data_set);
2070 #endif
2071     }
2072 
2073     /// Returns the future associated to the underlying execution
2074     std::shared_future<result_type> future() const noexcept override {
2075         return future_;
2076     }
2077 
2078     /// Adds a new listener for all event types
2079     void add_listener(std::shared_ptr<transwarp::listener> listener) override {
2080         ensure_task_not_running();
2081         check_listener(listener);
2082         ensure_listeners_object();
2083         for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2084             (*listeners_)[static_cast<transwarp::event_type>(i)].push_back(listener);
2085         }
2086     }
2087 
2088     /// Adds a new listener for the given event type only
2089     void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2090         ensure_task_not_running();
2091         check_listener(listener);
2092         ensure_listeners_object();
2093         (*listeners_)[event].push_back(std::move(listener));
2094     }
2095 
2096     /// Removes the listener for all event types
2097     void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
2098         ensure_task_not_running();
2099         check_listener(listener);
2100         if (!listeners_) {
2101             return;
2102         }
2103         for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2104             auto listeners_pair = listeners_->find(static_cast<transwarp::event_type>(i));
2105             if (listeners_pair != listeners_->end()) {
2106                 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2107                 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
2108             }
2109         }
2110     }
2111 
2112     /// Removes the listener for the given event type only
2113     void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2114         ensure_task_not_running();
2115         check_listener(listener);
2116         if (!listeners_) {
2117             return;
2118         }
2119         auto listeners_pair = listeners_->find(event);
2120         if (listeners_pair != listeners_->end()) {
2121             std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2122             l.erase(std::remove(l.begin(), l.end(), listener), l.end());
2123         }
2124     }
2125 
2126     /// Removes all listeners
2127     void remove_listeners() override {
2128         ensure_task_not_running();
2129         if (!listeners_) {
2130             return;
2131         }
2132         listeners_->clear();
2133     }
2134 
2135     /// Removes all listeners for the given event type
2136     void remove_listeners(transwarp::event_type event) override {
2137         ensure_task_not_running();
2138         if (!listeners_) {
2139             return;
2140         }
2141         auto listeners_pair = listeners_->find(event);
2142         if (listeners_pair != listeners_->end()) {
2143             listeners_pair->second.clear();
2144         }
2145     }
2146 
2147 protected:
2148 
2149     using listeners_t = std::map<transwarp::event_type, std::vector<std::shared_ptr<transwarp::listener>>>;
2150     using tasks_t = std::vector<transwarp::itask*>;
2151 
2152     /// Checks if the task is currently running and throws transwarp::control_error if it is
2153     void ensure_task_not_running() const {
2154         if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
2155             throw transwarp::control_error{"task currently running: " + transwarp::to_string(*this, " ")};
2156         }
2157     }
2158 
2159     /// Raises the given event to all listeners
2160     void raise_event(transwarp::event_type event) {
2161         if (!listeners_) {
2162             return;
2163         }
2164         auto listeners_pair = listeners_->find(event);
2165         if (listeners_pair != listeners_->end()) {
2166             for (const std::shared_ptr<transwarp::listener>& listener : listeners_pair->second) {
2167                 listener->handle_event(event, *this);
2168             }
2169         }
2170     }
2171 
2172     /// Check for non-null listener pointer
2173     void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
2174         if (!listener) {
2175             throw transwarp::invalid_parameter{"listener pointer"};
2176         }
2177     }
2178 
2179     void ensure_listeners_object() {
2180         if (!listeners_) {
2181             listeners_.reset(new listeners_t);
2182         }
2183     }
2184 
2185     /// Assigns the given id
2186     void set_id(std::size_t id) noexcept override {
2187         id_ = id;
2188     }
2189 
2190     /// Assigns the given name
2191     void set_name(transwarp::option_str name) noexcept override {
2192 #ifndef TRANSWARP_DISABLE_TASK_NAME
2193         name_ = std::move(name);
2194 #else
2195         (void)name;
2196 #endif
2197     }
2198 
2199     void copy_from(const task_common& task) {
2200         id_ = task.id_;
2201 #ifndef TRANSWARP_DISABLE_TASK_NAME
2202         name_ = task.name_;
2203 #endif
2204 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2205         priority_ = task.priority_;
2206 #endif
2207 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2208         custom_data_ = task.custom_data_;
2209 #endif
2210         if (task.has_result()) {
2211             try {
2212                 make_future_functor<result_type, std::is_void<result_type>::value>{}(future_, task.future_);
2213             } catch (...) {
2214                 future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
2215             }
2216         }
2217         visited_ = task.visited_;
2218         if (task.listeners_) {
2219             listeners_.reset(new listeners_t(*task.listeners_));
2220         }
2221     }
2222 
2223     std::size_t id_ = 0;
2224 #ifndef TRANSWARP_DISABLE_TASK_NAME
2225     transwarp::option_str name_;
2226 #endif
2227 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2228     std::int64_t priority_ = 0;
2229 #endif
2230 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2231     transwarp::any_data custom_data_;
2232 #endif
2233     std::shared_future<result_type> future_;
2234     bool visited_ = false;
2235     std::unique_ptr<listeners_t> listeners_;
2236     std::unique_ptr<tasks_t> tasks_;
2237 };
2238 
2239 
2240 /// The base task class that contains the functionality that can be used
2241 /// with all result types (void and non-void).
2242 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2243 class task_impl_base : public transwarp::detail::task_common<ResultType> {
2244 public:
2245     /// The task type
2246     using task_type = TaskType;
2247 
2248     /// The result type of this task
2249     using result_type = ResultType;
2250 
2251     /// Can be called to explicitly finalize this task making this task
2252     /// the terminal task in the graph. This is also done implicitly when
2253     /// calling, e.g., any of the *_all methods. It should normally not be
2254     /// necessary to call this method directly
2255     void finalize() override {
2256         if (!this->tasks_) {
2257             this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t);
2258             visit(transwarp::detail::final_visitor{*this->tasks_});
2259             unvisit();
2260             auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
2261                 const std::size_t l_level = l->level();
2262                 const std::size_t l_id = l->id();
2263                 const std::size_t r_level = r->level();
2264                 const std::size_t r_id = r->id();
2265                 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
2266             };
2267             std::sort(this->tasks_->begin(), this->tasks_->end(), compare);
2268         }
2269     }
2270 
2271     /// The task's level
2272     std::size_t level() const noexcept override {
2273         return level_;
2274     }
2275 
2276     /// The task's type
2277     transwarp::task_type type() const noexcept override {
2278         return type_;
2279     }
2280 
2281     /// The task's executor (may be null)
2282     std::shared_ptr<transwarp::executor> executor() const noexcept override {
2283         return executor_;
2284     }
2285 
2286     /// Returns whether the associated task is canceled
2287     bool canceled() const noexcept override {
2288         return canceled_.load();
2289     }
2290 
2291     /// Returns the average idletime in microseconds (-1 if never set)
2292     std::int64_t avg_idletime_us() const noexcept override {
2293 #ifndef TRANSWARP_DISABLE_TASK_TIME
2294         return avg_idletime_us_.load();
2295 #else
2296         return -1;
2297 #endif
2298     }
2299 
2300     /// Returns the average waittime in microseconds (-1 if never set)
2301     std::int64_t avg_waittime_us() const noexcept override {
2302 #ifndef TRANSWARP_DISABLE_TASK_TIME
2303         return avg_waittime_us_.load();
2304 #else
2305         return -1;
2306 #endif
2307     }
2308 
2309     /// Returns the average runtime in microseconds (-1 if never set)
2310     std::int64_t avg_runtime_us() const noexcept override {
2311 #ifndef TRANSWARP_DISABLE_TASK_TIME
2312         return avg_runtime_us_.load();
2313 #else
2314         return -1;
2315 #endif
2316     }
2317 
2318     /// Assigns an executor to this task which takes precedence over
2319     /// the executor provided in schedule() or schedule_all()
2320     void set_executor(std::shared_ptr<transwarp::executor> executor) override {
2321         this->ensure_task_not_running();
2322         if (!executor) {
2323             throw transwarp::invalid_parameter{"executor pointer"};
2324         }
2325         executor_ = std::move(executor);
2326     }
2327 
2328     /// Assigns an executor to all tasks which takes precedence over
2329     /// the executor provided in schedule() or schedule_all()
2330     void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
2331         this->ensure_task_not_running();
2332         transwarp::detail::set_executor_visitor visitor{std::move(executor)};
2333         visit_all(visitor);
2334     }
2335 
2336     /// Removes the executor from this task
2337     void remove_executor() override {
2338         this->ensure_task_not_running();
2339         executor_.reset();
2340     }
2341 
2342     /// Removes the executor from all tasks
2343     void remove_executor_all() override {
2344         this->ensure_task_not_running();
2345         transwarp::detail::remove_executor_visitor visitor;
2346         visit_all(visitor);
2347     }
2348 
2349     /// Schedules this task for execution on the caller thread.
2350     /// The task-specific executor gets precedence if it exists.
2351     /// This overload will reset the underlying future.
2352     void schedule() override {
2353         this->ensure_task_not_running();
2354         this->schedule_impl(true);
2355     }
2356 
2357     /// Schedules this task for execution on the caller thread.
2358     /// The task-specific executor gets precedence if it exists.
2359     /// reset denotes whether schedule should reset the underlying
2360     /// future and schedule even if the future is already valid.
2361     void schedule(bool reset) override {
2362         this->ensure_task_not_running();
2363         this->schedule_impl(reset);
2364     }
2365 
2366     /// Schedules this task for execution using the provided executor.
2367     /// The task-specific executor gets precedence if it exists.
2368     /// This overload will reset the underlying future.
2369     void schedule(transwarp::executor& executor) override {
2370         this->ensure_task_not_running();
2371         this->schedule_impl(true, &executor);
2372     }
2373 
2374     /// Schedules this task for execution using the provided executor.
2375     /// The task-specific executor gets precedence if it exists.
2376     /// reset denotes whether schedule should reset the underlying
2377     /// future and schedule even if the future is already valid.
2378     void schedule(transwarp::executor& executor, bool reset) override {
2379         this->ensure_task_not_running();
2380         this->schedule_impl(reset, &executor);
2381     }
2382 
2383     /// Schedules all tasks in the graph for execution on the caller thread.
2384     /// The task-specific executors get precedence if they exist.
2385     /// This overload will reset the underlying futures.
2386     void schedule_all() override {
2387         this->ensure_task_not_running();
2388         schedule_all_impl(true);
2389     }
2390 
2391     /// Schedules all tasks in the graph for execution using the provided executor.
2392     /// The task-specific executors get precedence if they exist.
2393     /// This overload will reset the underlying futures.
2394     void schedule_all(transwarp::executor& executor) override {
2395         this->ensure_task_not_running();
2396         schedule_all_impl(true, &executor);
2397     }
2398 
2399     /// Schedules all tasks in the graph for execution on the caller thread.
2400     /// The task-specific executors get precedence if they exist.
2401     /// reset_all denotes whether schedule_all should reset the underlying
2402     /// futures and schedule even if the futures are already present.
2403     void schedule_all(bool reset_all) override {
2404         this->ensure_task_not_running();
2405         schedule_all_impl(reset_all);
2406     }
2407 
2408     /// Schedules all tasks in the graph for execution using the provided executor.
2409     /// The task-specific executors get precedence if they exist.
2410     /// reset_all denotes whether schedule_all should reset the underlying
2411     /// futures and schedule even if the futures are already present.
2412     void schedule_all(transwarp::executor& executor, bool reset_all) override {
2413         this->ensure_task_not_running();
2414         schedule_all_impl(reset_all, &executor);
2415     }
2416 
2417     /// Assigns an exception to this task. Scheduling will have no effect after an exception
2418     /// has been set. Calling reset() will remove the exception and re-enable scheduling.
2419     void set_exception(std::exception_ptr exception) override {
2420         this->ensure_task_not_running();
2421         this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2422         schedule_mode_ = false;
2423         this->raise_event(transwarp::event_type::after_future_changed);
2424     }
2425 
2426     /// Returns whether the task was scheduled and not reset afterwards.
2427     /// This means that the underlying future is valid
2428     bool was_scheduled() const noexcept override {
2429         return this->future_.valid();
2430     }
2431 
2432     /// Waits for the task to complete. Should only be called if was_scheduled()
2433     /// is true, throws transwarp::control_error otherwise
2434     void wait() const override {
2435         ensure_task_was_scheduled();
2436         this->future_.wait();
2437     }
2438 
2439     /// Returns whether the task has finished processing. Should only be called
2440     /// if was_scheduled() is true, throws transwarp::control_error otherwise
2441     bool is_ready() const override {
2442         ensure_task_was_scheduled();
2443         return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2444     }
2445 
2446     /// Returns whether this task contains a result
2447     bool has_result() const noexcept override {
2448         return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2449     }
2450 
2451     /// Resets this task
2452     void reset() override {
2453         this->ensure_task_not_running();
2454         this->future_ = std::shared_future<result_type>{};
2455         cancel(false);
2456         schedule_mode_ = true;
2457 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2458         refcount_ = childcount_;
2459 #endif
2460         this->raise_event(transwarp::event_type::after_future_changed);
2461     }
2462 
2463     /// Resets all tasks in the graph
2464     void reset_all() override {
2465         this->ensure_task_not_running();
2466         transwarp::detail::reset_visitor visitor;
2467         visit_all(visitor);
2468     }
2469 
2470     /// If enabled then this task is canceled which will
2471     /// throw transwarp::task_canceled when retrieving the task result.
2472     /// Passing false is equivalent to resume.
2473     void cancel(bool enabled) noexcept override {
2474         canceled_ = enabled;
2475     }
2476 
2477     /// If enabled then all pending tasks in the graph are canceled which will
2478     /// throw transwarp::task_canceled when retrieving the task result.
2479     /// Passing false is equivalent to resume.
2480     void cancel_all(bool enabled) noexcept override {
2481         transwarp::detail::cancel_visitor visitor{enabled};
2482         visit_all(visitor);
2483     }
2484 
2485     /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2486     /// This is only useful if something else is using the priority (e.g. a custom executor)
2487     void set_priority_all(std::int64_t priority) override {
2488 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2489         this->ensure_task_not_running();
2490         transwarp::detail::set_priority_visitor visitor{priority};
2491         visit_all(visitor);
2492 #else
2493         (void)priority;
2494 #endif
2495     }
2496 
2497     /// Resets the priority of all tasks to 0
2498     void reset_priority_all() override {
2499 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2500         this->ensure_task_not_running();
2501         transwarp::detail::reset_priority_visitor visitor;
2502         visit_all(visitor);
2503 #endif
2504     }
2505 
2506     /// Assigns custom data to all tasks. transwarp will not directly use this.
2507     /// This is only useful if something else is using this custom data (e.g. a custom executor)
2508     void set_custom_data_all(transwarp::any_data custom_data) override {
2509 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2510         this->ensure_task_not_running();
2511         transwarp::detail::set_custom_data_visitor visitor{std::move(custom_data)};
2512         visit_all(visitor);
2513 #else
2514         (void)custom_data;
2515 #endif
2516     }
2517 
2518     /// Removes custom data from all tasks
2519     void remove_custom_data_all() override {
2520 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2521         this->ensure_task_not_running();
2522         transwarp::detail::remove_custom_data_visitor visitor;
2523         visit_all(visitor);
2524 #endif
2525     }
2526 
2527     /// Adds a new listener for all event types and for all parents
2528     void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
2529         this->ensure_task_not_running();
2530         transwarp::detail::add_listener_visitor visitor{std::move(listener)};
2531         visit_all(visitor);
2532     }
2533 
2534     /// Adds a new listener for the given event type only and for all parents
2535     void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2536         this->ensure_task_not_running();
2537         transwarp::detail::add_listener_per_event_visitor visitor{event, std::move(listener)};
2538         visit_all(visitor);
2539     }
2540 
2541     /// Removes the listener for all event types and for all parents
2542     void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
2543         this->ensure_task_not_running();
2544         transwarp::detail::remove_listener_visitor visitor{std::move(listener)};
2545         visit_all(visitor);
2546     }
2547 
2548     /// Removes the listener for the given event type only and for all parents
2549     void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2550         this->ensure_task_not_running();
2551         transwarp::detail::remove_listener_per_event_visitor visitor{event, std::move(listener)};
2552         visit_all(visitor);
2553     }
2554 
2555     /// Removes all listeners and for all parents
2556     void remove_listeners_all() override {
2557         this->ensure_task_not_running();
2558         transwarp::detail::remove_listeners_visitor visitor;
2559         visit_all(visitor);
2560     }
2561 
2562     /// Removes all listeners for the given event type and for all parents
2563     void remove_listeners_all(transwarp::event_type event) override {
2564         this->ensure_task_not_running();
2565         transwarp::detail::remove_listeners_per_event_visitor visitor{event};
2566         visit_all(visitor);
2567     }
2568 
2569     /// Returns the task's parents (may be empty)
2570     std::vector<transwarp::itask*> parents() const override {
2571         return transwarp::detail::parents<ParentResults...>::tasks(parents_);
2572     }
2573 
2574     /// Returns all tasks in the graph in breadth order
2575     const std::vector<transwarp::itask*>& tasks() override {
2576         finalize();
2577         return *this->tasks_;
2578     }
2579 
2580     /// Returns all edges in the graph. This is mainly for visualizing
2581     /// the tasks and their interdependencies. Pass the result into transwarp::to_string
2582     /// to retrieve a dot-style graph representation for easy viewing.
2583     std::vector<transwarp::edge> edges() override {
2584         std::vector<transwarp::edge> edges;
2585         transwarp::detail::edges_visitor visitor{edges};
2586         visit_all(visitor);
2587         return edges;
2588     }
2589 
2590 protected:
2591 
2592     task_impl_base() {}
2593 
2594     template<typename F>
2595     task_impl_base(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2596     : functor_(new Functor(std::forward<F>(functor))),
2597       parents_(std::move(parents)...)
2598     {
2599         init();
2600     }
2601 
2602     template<typename F, typename P>
2603     task_impl_base(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2604     : functor_(new Functor(std::forward<F>(functor))),
2605       parents_(std::move(parents))
2606     {
2607         init();
2608         if (parents_.empty()) {
2609             set_type(transwarp::task_type::root);
2610         }
2611     }
2612 
2613     void init() {
2614         set_type(task_type::value);
2615         transwarp::detail::assign_task_if(*functor_, *this);
2616         transwarp::detail::call_with_each(transwarp::detail::parent_visitor{*this}, parents_);
2617     }
2618 
2619     template<typename R, typename Y, typename T, typename P>
2620     friend class transwarp::detail::runner;
2621 
2622     template<typename R, typename T, typename... A>
2623     friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
2624 
2625     /// Assigns the given level
2626     void set_level(std::size_t level) noexcept override {
2627         level_ = level;
2628     }
2629 
2630     /// Assigns the given type
2631     void set_type(transwarp::task_type type) noexcept override {
2632         type_ = type;
2633     }
2634 
2635     /// Assigns the given idletime
2636     void set_avg_idletime_us(std::int64_t idletime) noexcept override {
2637 #ifndef TRANSWARP_DISABLE_TASK_TIME
2638         avg_idletime_us_ = idletime;
2639 #else
2640         (void)idletime;
2641 #endif
2642     }
2643 
2644     /// Assigns the given waittime
2645     void set_avg_waittime_us(std::int64_t waittime) noexcept override {
2646 #ifndef TRANSWARP_DISABLE_TASK_TIME
2647         avg_waittime_us_ = waittime;
2648 #else
2649         (void)waittime;
2650 #endif
2651     }
2652 
2653     /// Assigns the given runtime
2654     void set_avg_runtime_us(std::int64_t runtime) noexcept override {
2655 #ifndef TRANSWARP_DISABLE_TASK_TIME
2656         avg_runtime_us_ = runtime;
2657 #else
2658         (void)runtime;
2659 #endif
2660     }
2661 
2662     /// Checks if the task was scheduled and throws transwarp::control_error if it's not
2663     void ensure_task_was_scheduled() const {
2664         if (!this->future_.valid()) {
2665             throw transwarp::control_error{"task was not scheduled: " + transwarp::to_string(*this, " ")};
2666         }
2667     }
2668 
2669     /// Schedules this task for execution using the provided executor.
2670     /// The task-specific executor gets precedence if it exists.
2671     /// Runs the task on the same thread as the caller if neither the global
2672     /// nor the task-specific executor is found.
2673     void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
2674         if (schedule_mode_ && (reset || !this->future_.valid())) {
2675             if (reset) {
2676                 cancel(false);
2677             }
2678 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2679             refcount_ = childcount_;
2680 #endif
2681             std::weak_ptr<task_impl_base> self = std::static_pointer_cast<task_impl_base>(this->shared_from_this());
2682             using runner_t = transwarp::detail::runner<result_type, task_type, task_impl_base, decltype(parents_)>;
2683             std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(new runner_t(this->id(), self, parents_));
2684             this->raise_event(transwarp::event_type::before_scheduled);
2685             this->future_ = runner->future();
2686             this->raise_event(transwarp::event_type::after_future_changed);
2687             if (this->executor_) {
2688                 this->executor_->execute([runner]{ (*runner)(); }, *this);
2689             } else if (executor) {
2690                 executor->execute([runner]{ (*runner)(); }, *this);
2691             } else {
2692                 (*runner)();
2693             }
2694         }
2695     }
2696 
2697     /// Schedules all tasks in the graph for execution using the provided executor.
2698     /// The task-specific executors get precedence if they exist.
2699     /// Runs tasks on the same thread as the caller if neither the global
2700     /// nor a task-specific executor is found.
2701     void schedule_all_impl(bool reset_all, transwarp::executor* executor=nullptr) {
2702         transwarp::detail::schedule_visitor visitor{reset_all, executor};
2703         visit_all(visitor);
2704     }
2705 
2706     /// Visits each task in a depth-first traversal
2707     void visit(const std::function<void(transwarp::itask&)>& visitor) override {
2708         if (!this->visited_) {
2709             transwarp::detail::call_with_each(transwarp::detail::visit_visitor{visitor}, parents_);
2710             visitor(*this);
2711             this->visited_ = true;
2712         }
2713     }
2714 
2715     /// Traverses through each task and marks them as not visited.
2716     void unvisit() noexcept override {
2717         if (this->visited_) {
2718             this->visited_ = false;
2719             transwarp::detail::call_with_each(transwarp::detail::unvisit_visitor{}, parents_);
2720         }
2721     }
2722 
2723     /// Visits all tasks
2724     template<typename Visitor>
2725     void visit_all(Visitor& visitor) {
2726         finalize();
2727         for (transwarp::itask* t : *this->tasks_) {
2728             visitor(*t);
2729         }
2730     }
2731 
2732     void increment_childcount() noexcept override {
2733 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2734         ++childcount_;
2735 #endif
2736     }
2737 
2738     void decrement_refcount() override {
2739 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2740         if (--refcount_ == 0) {
2741             this->raise_event(transwarp::event_type::after_satisfied);
2742         }
2743 #endif
2744     }
2745 
2746     void reset_future() override {
2747         this->future_ = std::shared_future<result_type>{};
2748         this->raise_event(transwarp::event_type::after_future_changed);
2749     }
2750 
2751     std::size_t level_ = 0;
2752     transwarp::task_type type_ = transwarp::task_type::root;
2753     std::shared_ptr<transwarp::executor> executor_;
2754     std::atomic<bool> canceled_{false};
2755     bool schedule_mode_ = true;
2756 #ifndef TRANSWARP_DISABLE_TASK_TIME
2757     std::atomic<std::int64_t> avg_idletime_us_{-1};
2758     std::atomic<std::int64_t> avg_waittime_us_{-1};
2759     std::atomic<std::int64_t> avg_runtime_us_{-1};
2760 #endif
2761     std::unique_ptr<Functor> functor_;
2762     typename transwarp::detail::parents<ParentResults...>::type parents_;
2763 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2764     std::size_t childcount_ = 0;
2765     std::atomic<std::size_t> refcount_{0};
2766 #endif
2767 };
2768 
2769 
2770 /// A task proxy
2771 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2772 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2773 public:
2774     /// The task type
2775     using task_type = TaskType;
2776 
2777     /// The result type of this task
2778     using result_type = ResultType;
2779 
2780     /// Assigns a value to this task. Scheduling will have no effect after a value
2781     /// has been set. Calling reset() will remove the value and re-enable scheduling.
2782     void set_value(const typename transwarp::decay<result_type>::type& value) override {
2783         this->ensure_task_not_running();
2784         this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2785         this->schedule_mode_ = false;
2786         this->raise_event(transwarp::event_type::after_future_changed);
2787     }
2788 
2789     /// Assigns a value to this task. Scheduling will have no effect after a value
2790     /// has been set. Calling reset() will remove the value and re-enable scheduling.
2791     void set_value(typename transwarp::decay<result_type>::type&& value) override {
2792         this->ensure_task_not_running();
2793         this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2794         this->schedule_mode_ = false;
2795         this->raise_event(transwarp::event_type::after_future_changed);
2796     }
2797 
2798     /// Returns the result of this task. Throws any exceptions that the underlying
2799     /// functor throws. Should only be called if was_scheduled() is true,
2800     /// throws transwarp::control_error otherwise
2801     typename transwarp::result<result_type>::type get() const override {
2802         this->ensure_task_was_scheduled();
2803         return this->future_.get();
2804     }
2805 
2806 protected:
2807 
2808     task_impl_proxy() = default;
2809 
2810     template<typename F>
2811     task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2812     : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2813     {}
2814 
2815     template<typename F, typename P>
2816     task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2817     : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2818     {}
2819 
2820 };
2821 
2822 /// A task proxy for reference result type.
2823 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2824 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2825 public:
2826     /// The task type
2827     using task_type = TaskType;
2828 
2829     /// The result type of this task
2830     using result_type = ResultType&;
2831 
2832     /// Assigns a value to this task. Scheduling will have no effect after a value
2833     /// has been set. Calling reset() will remove the value and re-enable scheduling.
2834     void set_value(typename transwarp::decay<result_type>::type& value) override {
2835         this->ensure_task_not_running();
2836         this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2837         this->schedule_mode_ = false;
2838         this->raise_event(transwarp::event_type::after_future_changed);
2839     }
2840 
2841     /// Returns the result of this task. Throws any exceptions that the underlying
2842     /// functor throws. Should only be called if was_scheduled() is true,
2843     /// throws transwarp::control_error otherwise
2844     typename transwarp::result<result_type>::type get() const override {
2845         this->ensure_task_was_scheduled();
2846         return this->future_.get();
2847     }
2848 
2849 protected:
2850 
2851     task_impl_proxy() = default;
2852 
2853     template<typename F>
2854     task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2855     : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2856     {}
2857 
2858     template<typename F, typename P>
2859     task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2860     : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2861     {}
2862 
2863 };
2864 
2865 /// A task proxy for void result type.
2866 template<typename TaskType, typename Functor, typename... ParentResults>
2867 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2868 public:
2869     /// The task type
2870     using task_type = TaskType;
2871 
2872     /// The result type of this task
2873     using result_type = void;
2874 
2875     /// Assigns a value to this task. Scheduling will have no effect after a call
2876     /// to this. Calling reset() will reset this and re-enable scheduling.
2877     void set_value() override {
2878         this->ensure_task_not_running();
2879         this->future_ = transwarp::detail::make_ready_future();
2880         this->schedule_mode_ = false;
2881         this->raise_event(transwarp::event_type::after_future_changed);
2882     }
2883 
2884     /// Blocks until the task finishes. Throws any exceptions that the underlying
2885     /// functor throws. Should only be called if was_scheduled() is true,
2886     /// throws transwarp::control_error otherwise
2887     void get() const override {
2888         this->ensure_task_was_scheduled();
2889         this->future_.get();
2890     }
2891 
2892 protected:
2893 
2894     task_impl_proxy() = default;
2895 
2896     template<typename F>
2897     task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2898     : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2899     {}
2900 
2901     template<typename F, typename P>
2902     task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2903     : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2904     {}
2905 
2906 };
2907 
2908 } // detail
2909 
2910 
2911 /// A task representing a piece of work given by functor and parent tasks.
2912 /// By connecting tasks a directed acyclic graph is built.
2913 /// Tasks should be created using the make_task factory functions.
2914 template<typename TaskType, typename Functor, typename... ParentResults>
2915 class task_impl : public transwarp::detail::task_impl_proxy<typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type, TaskType, Functor, ParentResults...> {
2916 public:
2917     /// The task type
2918     using task_type = TaskType;
2919 
2920     /// The result type of this task
2921     using result_type = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
2922 
2923     /// A task is defined by functor and parent tasks.
2924     /// Note: Don't use this constructor directly, use transwarp::make_task
2925     template<typename F>
2926     task_impl(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2927     : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2928     {}
2929 
2930     /// A task is defined by functor and parent tasks.
2931     /// Note: Don't use this constructor directly, use transwarp::make_task
2932     template<typename F, typename P>
2933     task_impl(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2934     : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2935     {}
2936 
2937     // delete copy/move semantics
2938     task_impl(const task_impl&) = delete;
2939     task_impl& operator=(const task_impl&) = delete;
2940     task_impl(task_impl&&) = delete;
2941     task_impl& operator=(task_impl&&) = delete;
2942 
2943     /// Gives this task a name and returns a ptr to itself
2944     std::shared_ptr<task_impl> named(std::string name) {
2945 #ifndef TRANSWARP_DISABLE_TASK_NAME
2946 #ifdef TRANSWARP_CPP11
2947         this->set_name(transwarp::option_str{std::move(name)});
2948 #else
2949         this->set_name(std::make_optional(std::move(name)));
2950 #endif
2951 #else
2952         (void)name;
2953 #endif
2954         return std::static_pointer_cast<task_impl>(this->shared_from_this());
2955     }
2956 
2957     /// Creates a continuation to this task
2958     template<typename TaskType_, typename Functor_>
2959     auto then(TaskType_, Functor_&& functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
2960         using task_t = transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>;
2961         return std::shared_ptr<task_t>(new task_t(std::forward<Functor_>(functor), std::static_pointer_cast<task_impl>(this->shared_from_this())));
2962     }
2963 
2964     /// Clones this task and casts the result to a ptr to task_impl
2965     std::shared_ptr<task_impl> clone_cast() const {
2966         return std::static_pointer_cast<task_impl>(this->clone());
2967     }
2968 
2969 private:
2970 
2971     task_impl() = default;
2972 
2973     std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const override {
2974         auto t = std::shared_ptr<task_impl>{new task_impl};
2975         t->copy_from(*this);
2976         t->level_ = this->level_;
2977         t->type_ = this->type_;
2978         t->executor_ = this->executor_;
2979         t->canceled_ = this->canceled_.load();
2980         t->schedule_mode_ = this->schedule_mode_;
2981 #ifndef TRANSWARP_DISABLE_TASK_TIME
2982         t->avg_idletime_us_ = this->avg_idletime_us_.load();
2983         t->avg_waittime_us_ = this->avg_waittime_us_.load();
2984         t->avg_runtime_us_ = this->avg_runtime_us_.load();
2985 #endif
2986         t->functor_.reset(new Functor(*this->functor_));
2987         t->parents_ = transwarp::detail::parents<ParentResults...>::clone(task_cache, this->parents_);
2988         t->executor_ = this->executor_;
2989 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2990         t->childcount_ = this->childcount_;
2991 #endif
2992         return t;
2993     }
2994 
2995 };
2996 
2997 
2998 /// A value task that stores a single value and doesn't require scheduling.
2999 /// Value tasks should be created using the make_value_task factory functions.
3000 template<typename ResultType>
3001 class value_task : public transwarp::detail::task_common<ResultType> {
3002 public:
3003     /// The task type
3004     using task_type = transwarp::root_type;
3005 
3006     /// The result type of this task
3007     using result_type = ResultType;
3008 
3009     /// A value task is defined by a given value.
3010     /// Note: Don't use this constructor directly, use transwarp::make_value_task
3011     template<typename T>
3012     value_task(T&& value)
3013     {
3014         this->future_ = transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value));
3015         this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t{this});
3016     }
3017 
3018     // delete copy/move semantics
3019     value_task(const value_task&) = delete;
3020     value_task& operator=(const value_task&) = delete;
3021     value_task(value_task&&) = delete;
3022     value_task& operator=(value_task&&) = delete;
3023 
3024     /// Gives this task a name and returns a ptr to itself
3025     std::shared_ptr<value_task> named(std::string name) {
3026 #ifndef TRANSWARP_DISABLE_TASK_NAME
3027 #ifdef TRANSWARP_CPP11
3028         this->set_name(transwarp::option_str{std::move(name)});
3029 #else
3030         this->set_name(std::make_optional(std::move(name)));
3031 #endif
3032 #else
3033         (void)name;
3034 #endif
3035         return std::static_pointer_cast<value_task>(this->shared_from_this());
3036     }
3037 
3038     /// Creates a continuation to this task
3039     template<typename TaskType_, typename Functor_>
3040     auto then(TaskType_, Functor_&& functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
3041         using task_t = transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>;
3042         return std::shared_ptr<task_t>(new task_t(std::forward<Functor_>(functor), std::static_pointer_cast<value_task>(this->shared_from_this())));
3043     }
3044 
3045     /// Clones this task and casts the result to a ptr to value_task
3046     std::shared_ptr<value_task> clone_cast() const {
3047         return std::static_pointer_cast<value_task>(this->clone());
3048     }
3049 
3050     /// Nothing to be done to finalize a value task
3051     void finalize() override {}
3052 
3053     /// The task's level
3054     std::size_t level() const noexcept override {
3055         return 0;
3056     }
3057 
3058     /// The task's type
3059     transwarp::task_type type() const noexcept override {
3060         return transwarp::task_type::root;
3061     }
3062 
3063     /// Value tasks don't have executors as they don't run
3064     std::shared_ptr<transwarp::executor> executor() const noexcept override {
3065         return nullptr;
3066     }
3067 
3068     /// Value tasks cannot be canceled
3069     bool canceled() const noexcept override {
3070         return false;
3071     }
3072 
3073     /// Returns -1 as value tasks don't run
3074     std::int64_t avg_idletime_us() const noexcept override {
3075         return -1;
3076     }
3077 
3078     /// Returns -1 as value tasks don't run
3079     std::int64_t avg_waittime_us() const noexcept override {
3080         return -1;
3081     }
3082 
3083     /// Returns -1 as value tasks don't run
3084     std::int64_t avg_runtime_us() const noexcept override {
3085         return -1;
3086     }
3087 
3088     /// No-op because a value task never runs
3089     void set_executor(std::shared_ptr<transwarp::executor>) override {}
3090 
3091     /// No-op because a value task never runs and doesn't have parents
3092     void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
3093 
3094     /// No-op because a value task never runs
3095     void remove_executor() override {}
3096 
3097     /// No-op because a value task never runs and doesn't have parents
3098     void remove_executor_all() override {}
3099 
3100     /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
3101     /// This is only useful if something else is using the priority
3102     void set_priority_all(std::int64_t priority) override {
3103 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3104         this->set_priority(priority);
3105 #else
3106         (void)priority;
3107 #endif
3108     }
3109 
3110     /// Resets the priority of all tasks to 0
3111     void reset_priority_all() override {
3112 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3113         this->reset_priority();
3114 #endif
3115     }
3116 
3117     /// Assigns custom data to all tasks. transwarp will not directly use this.
3118     /// This is only useful if something else is using this custom data
3119     void set_custom_data_all(transwarp::any_data custom_data) override {
3120 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3121         this->set_custom_data(std::move(custom_data));
3122 #else
3123         (void)custom_data;
3124 #endif
3125     }
3126 
3127     /// Removes custom data from all tasks
3128     void remove_custom_data_all() override {
3129 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3130         this->remove_custom_data();
3131 #endif
3132     }
3133 
3134     /// No-op because a value task never runs
3135     void schedule() override {}
3136 
3137     /// No-op because a value task never runs
3138     void schedule(transwarp::executor&) override {}
3139 
3140     /// No-op because a value task never runs
3141     void schedule(bool) override {}
3142 
3143     /// No-op because a value task never runs
3144     void schedule(transwarp::executor&, bool) override {}
3145 
3146     /// No-op because a value task never runs and doesn't have parents
3147     void schedule_all() override {}
3148 
3149     /// No-op because a value task never runs and doesn't have parents
3150     void schedule_all(transwarp::executor&) override {}
3151 
3152     /// No-op because a value task never runs and doesn't have parents
3153     void schedule_all(bool) override {}
3154 
3155     /// No-op because a value task never runs and doesn't have parents
3156     void schedule_all(transwarp::executor&, bool) override {}
3157 
3158     /// Assigns a value to this task
3159     void set_value(const typename transwarp::decay<result_type>::type& value) override {
3160         this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
3161         this->raise_event(transwarp::event_type::after_future_changed);
3162     }
3163 
3164     /// Assigns a value to this task
3165     void set_value(typename transwarp::decay<result_type>::type&& value) override {
3166         this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
3167         this->raise_event(transwarp::event_type::after_future_changed);
3168     }
3169 
3170     /// Assigns an exception to this task
3171     void set_exception(std::exception_ptr exception) override {
3172         this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
3173         this->raise_event(transwarp::event_type::after_future_changed);
3174     }
3175 
3176     /// Returns the value of this task. Throws an exception if this task has an exception assigned to it
3177     typename transwarp::result<result_type>::type get() const override {
3178         return this->future_.get();
3179     }
3180 
3181     /// Returns true because a value task is scheduled once on construction
3182     bool was_scheduled() const noexcept override {
3183         return true;
3184     }
3185 
3186     /// No-op because a value task never runs
3187     void wait() const override {}
3188 
3189     /// Returns true because a value task is always ready
3190     bool is_ready() const override {
3191         return true;
3192     }
3193 
3194     /// Returns true because a value task always contains a result
3195     bool has_result() const noexcept override {
3196         return true;
3197     }
3198 
3199     /// No-op because a value task never runs
3200     void reset() override {}
3201 
3202     /// No-op because a value task never runs and doesn't have parents
3203     void reset_all() override {}
3204 
3205     /// No-op because a value task never runs
3206     void cancel(bool) noexcept override {}
3207 
3208     /// No-op because a value task never runs and doesn't have parents
3209     void cancel_all(bool) noexcept override {}
3210 
3211     /// Adds a new listener for all event types and for all parents
3212     void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
3213         this->add_listener(listener);
3214     }
3215 
3216     /// Adds a new listener for the given event type only and for all parents
3217     void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
3218         this->add_listener(event, listener);
3219     }
3220 
3221     /// Removes the listener for all event types and for all parents
3222     void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
3223         this->remove_listener(listener);
3224     }
3225 
3226     /// Removes the listener for the given event type only and for all parents
3227     void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
3228         this->remove_listener(event, listener);
3229     }
3230 
3231     /// Removes all listeners and for all parents
3232     void remove_listeners_all() override {
3233         this->remove_listeners();
3234     }
3235 
3236     /// Removes all listeners for the given event type and for all parents
3237     void remove_listeners_all(transwarp::event_type event) override {
3238         this->remove_listeners(event);
3239     }
3240 
3241     /// Empty because a value task doesn't have parents
3242     std::vector<transwarp::itask*> parents() const override {
3243         return {};
3244     }
3245 
3246     /// Returns all tasks in the graph in breadth order
3247     const std::vector<transwarp::itask*>& tasks() override {
3248         return *this->tasks_;
3249     }
3250 
3251     /// Returns empty edges because a value task doesn't have parents
3252     std::vector<transwarp::edge> edges() override {
3253         return {};
3254     }
3255 
3256 private:
3257 
3258     value_task()
3259     {
3260         this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t{this});
3261     }
3262 
3263     std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&) const override {
3264         auto t = std::shared_ptr<value_task>(new value_task);
3265         t->copy_from(*this);
3266         return t;
3267     }
3268 
3269     /// No-op as value tasks are always at level 0
3270     void set_level(std::size_t) noexcept override {}
3271 
3272     /// No-op as value tasks are always root tasks
3273     void set_type(transwarp::task_type) noexcept override {}
3274 
3275     /// No-op as value tasks don't run
3276     void set_avg_idletime_us(std::int64_t) noexcept override {}
3277 
3278     /// No-op as value tasks don't run
3279     void set_avg_waittime_us(std::int64_t) noexcept override {}
3280 
3281     /// No-op as value tasks don't run
3282     void set_avg_runtime_us(std::int64_t) noexcept override {}
3283 
3284     /// No-op because a value task never runs
3285     void schedule_impl(bool, transwarp::executor*) override {}
3286 
3287     /// Visits this task
3288     void visit(const std::function<void(transwarp::itask&)>& visitor) override {
3289         if (!this->visited_) {
3290             visitor(*this);
3291             this->visited_ = true;
3292         }
3293     }
3294 
3295     /// Marks this task as not visited
3296     void unvisit() noexcept override {
3297         this->visited_ = false;
3298     }
3299 
3300     void increment_childcount() noexcept override {}
3301 
3302     void decrement_refcount() override {}
3303 
3304     void reset_future() override {}
3305 
3306 };
3307 
3308 
3309 /// A factory function to create a new task
3310 template<typename TaskType, typename Functor, typename... Parents>
3311 auto make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>> {
3312     using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
3313     return std::shared_ptr<task_t>(new task_t(std::forward<Functor>(functor), std::move(parents)...));
3314 }
3315 
3316 
3317 /// A factory function to create a new task with vector parents
3318 template<typename TaskType, typename Functor, typename ParentType>
3319 auto make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>> {
3320     using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
3321     return std::shared_ptr<task_t>(new task_t(std::forward<Functor>(functor), std::move(parents)));
3322 }
3323 
3324 
3325 /// A factory function to create a new value task
3326 template<typename Value>
3327 auto make_value_task(Value&& value) -> std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>> {
3328     using task_t = transwarp::value_task<typename transwarp::decay<Value>::type>;
3329     return std::shared_ptr<task_t>(new task_t(std::forward<Value>(value)));
3330 }
3331 
3332 
3333 /// A function similar to std::for_each but returning a transwarp task for
3334 /// deferred, possibly asynchronous execution. This function creates a graph
3335 /// with std::distance(first, last) root tasks
3336 template<typename InputIt, typename UnaryOperation>
3337 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3338     const auto distance = std::distance(first, last);
3339     if (distance <= 0) {
3340         throw transwarp::invalid_parameter{"first or last"};
3341     }
3342     std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3343     tasks.reserve(static_cast<std::size_t>(distance));
3344     for (; first != last; ++first) {
3345         tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); }));
3346     }
3347     return transwarp::make_task(transwarp::wait, transwarp::no_op, tasks);
3348 }
3349 
3350 /// A function similar to std::for_each but returning a transwarp task for
3351 /// deferred, possibly asynchronous execution. This function creates a graph
3352 /// with std::distance(first, last) root tasks.
3353 /// Overload for automatic scheduling by passing an executor.
3354 template<typename InputIt, typename UnaryOperation>
3355 auto for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3356     auto task = transwarp::for_each(first, last, unary_op);
3357     task->schedule_all(executor);
3358     return task;
3359 }
3360 
3361 
3362 /// A function similar to std::transform but returning a transwarp task for
3363 /// deferred, possibly asynchronous execution. This function creates a graph
3364 /// with std::distance(first1, last1) root tasks
3365 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3366 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3367     const auto distance = std::distance(first1, last1);
3368     if (distance <= 0) {
3369         throw transwarp::invalid_parameter{"first1 or last1"};
3370     }
3371     std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3372     tasks.reserve(static_cast<std::size_t>(distance));
3373     for (; first1 != last1; ++first1, ++d_first) {
3374         tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); }));
3375     }
3376     return transwarp::make_task(transwarp::wait, transwarp::no_op, tasks);
3377 }
3378 
3379 /// A function similar to std::transform but returning a transwarp task for
3380 /// deferred, possibly asynchronous execution. This function creates a graph
3381 /// with std::distance(first1, last1) root tasks.
3382 /// Overload for automatic scheduling by passing an executor.
3383 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3384 auto transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3385     auto task = transwarp::transform(first1, last1, d_first, unary_op);
3386     task->schedule_all(executor);
3387     return task;
3388 }
3389 
3390 
3391 /// A task pool that allows running multiple instances of the same task in parallel.
3392 template<typename ResultType>
3393 class task_pool {
3394 public:
3395 
3396     /// Constructs a task pool
3397     task_pool(std::shared_ptr<transwarp::task<ResultType>> task,
3398               std::size_t minimum_size,
3399               std::size_t maximum_size)
3400     : task_(std::move(task)),
3401       minimum_(minimum_size),
3402       maximum_(maximum_size),
3403       finished_(maximum_size)
3404     {
3405         if (minimum_ < 1) {
3406             throw transwarp::invalid_parameter{"minimum size"};
3407         }
3408         if (minimum_ > maximum_) {
3409             throw transwarp::invalid_parameter{"minimum or maximum size"};
3410         }
3411         task_->add_listener(transwarp::event_type::after_finished, listener_);
3412         for (std::size_t i=0; i<minimum_; ++i) {
3413             idle_.push(task_->clone());
3414         }
3415     }
3416 
3417     /// Constructs a task pool with reasonable defaults for minimum and maximum
3418     explicit
3419     task_pool(std::shared_ptr<transwarp::task<ResultType>> task)
3420     : task_pool(std::move(task), 32, 65536)
3421     {}
3422 
3423     // delete copy/move semantics
3424     task_pool(const task_pool&) = delete;
3425     task_pool& operator=(const task_pool&) = delete;
3426     task_pool(task_pool&&) = delete;
3427     task_pool& operator=(task_pool&&) = delete;
3428 
3429     /// Returns the next idle task.
3430     /// If there are no idle tasks then it will attempt to double the
3431     /// pool size. If that fails then it will return a nullptr. On successful
3432     /// retrieval of an idle task the function will mark that task as busy.
3433     std::shared_ptr<transwarp::task<ResultType>> next_task(bool maybe_resize=true) {
3434         const transwarp::itask* finished_task{};
3435         {
3436             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3437             if (!finished_.empty()) {
3438                 finished_task = finished_.front(); finished_.pop();
3439             }
3440         }
3441 
3442         std::shared_ptr<transwarp::task<ResultType>> task;
3443         if (finished_task) {
3444             task = busy_.find(finished_task)->second;
3445         } else {
3446             if (maybe_resize && idle_.empty()) {
3447                 resize(size() * 2); // double pool size
3448             }
3449             if (idle_.empty()) {
3450                 return nullptr;
3451             }
3452             task = idle_.front(); idle_.pop();
3453             busy_.emplace(task.get(), task);
3454         }
3455 
3456         auto future = task->future();
3457         if (future.valid()) {
3458             future.wait(); // will return immediately
3459         }
3460         return task;
3461     }
3462 
3463     /// Just like next_task() but waits for a task to become available.
3464     /// The returned task will always be a valid pointer
3465     std::shared_ptr<transwarp::task<ResultType>> wait_for_next_task(bool maybe_resize=true) {
3466         for (;;) {
3467             std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
3468             if (g) {
3469                 return g;
3470             }
3471         }
3472     }
3473 
3474     /// Returns the current total size of the pool (sum of idle and busy tasks)
3475     std::size_t size() const {
3476         return idle_.size() + busy_.size();
3477     }
3478 
3479     /// Returns the minimum size of the pool
3480     std::size_t minimum_size() const {
3481         return minimum_;
3482     }
3483 
3484     /// Returns the maximum size of the pool
3485     std::size_t maximum_size() const {
3486         return maximum_;
3487     }
3488 
3489     /// Returns the number of idle tasks in the pool
3490     std::size_t idle_count() const {
3491         std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3492         return idle_.size() + finished_.size();
3493     }
3494 
3495     /// Returns the number of busy tasks in the pool
3496     std::size_t busy_count() const {
3497         std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3498         return busy_.size() - finished_.size();
3499     }
3500 
3501     /// Resizes the task pool to the given new size if possible
3502     void resize(std::size_t new_size) {
3503         reclaim();
3504         if (new_size > size()) { // grow
3505             const std::size_t count = new_size - size();
3506             for (std::size_t i=0; i<count; ++i) {
3507                 if (size() == maximum_) {
3508                     break;
3509                 }
3510                 idle_.push(task_->clone());
3511             }
3512         } else if (new_size < size()) { // shrink
3513             const std::size_t count = size() - new_size;
3514             for (std::size_t i=0; i<count; ++i) {
3515                 if (idle_.empty() || size() == minimum_) {
3516                     break;
3517                 }
3518                 idle_.pop();
3519             }
3520         }
3521     }
3522 
3523     /// Reclaims finished tasks by marking them as idle again
3524     void reclaim() {
3525         decltype(finished_) finished{finished_.capacity()};
3526         {
3527             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3528             finished_.swap(finished);
3529         }
3530         while (!finished.empty()) {
3531             const transwarp::itask* task = finished.front(); finished.pop();
3532             const auto it = busy_.find(task);
3533             idle_.push(it->second);
3534             busy_.erase(it);
3535         }
3536     }
3537 
3538 private:
3539 
3540     class finished_listener : public transwarp::listener {
3541     public:
3542 
3543         explicit
3544         finished_listener(task_pool<ResultType>& pool)
3545         : pool_(pool)
3546         {}
3547 
3548         // Called on a potentially high-priority thread
3549         void handle_event(transwarp::event_type, transwarp::itask& task) override {
3550             std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3551             pool_.finished_.push(static_cast<const transwarp::itask*>(&task));
3552         }
3553 
3554     private:
3555         task_pool<ResultType>& pool_;
3556     };
3557 
3558     std::shared_ptr<transwarp::task<ResultType>> task_;
3559     std::size_t minimum_;
3560     std::size_t maximum_;
3561     mutable transwarp::detail::spinlock spinlock_; // protecting finished_
3562     transwarp::detail::circular_buffer<const transwarp::itask*> finished_;
3563     std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3564     std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3565     std::shared_ptr<transwarp::listener> listener_{new finished_listener(*this)};
3566 };
3567 
3568 
3569 /// A timer that tracks the average idle, wait, and run time of each task it listens to.
3570 /// - idle = time between scheduling and starting the task (executor dependent)
3571 /// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish
3572 /// - run = time between invoking and finishing the task's computations
3573 class timer : public transwarp::listener {
3574 public:
3575     timer() = default;
3576 
3577     // delete copy/move semantics
3578     timer(const timer&) = delete;
3579     timer& operator=(const timer&) = delete;
3580     timer(timer&&) = delete;
3581     timer& operator=(timer&&) = delete;
3582 
3583     /// Performs the actual timing and populates the task's timing members
3584     void handle_event(const transwarp::event_type event, transwarp::itask& task) override {
3585         switch (event) {
3586         case transwarp::event_type::before_scheduled: {
3587             const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3588             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3589             auto& track = tracks_[&task];
3590             track.startidle = now;
3591         }
3592         break;
3593         case transwarp::event_type::before_started: {
3594             const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3595             track_idletime(task, now);
3596             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3597             auto& track = tracks_[&task];
3598             track.startwait = now;
3599         }
3600         break;
3601         case transwarp::event_type::after_canceled: {
3602             const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3603             track_waittime(task, now);
3604         }
3605         break;
3606         case transwarp::event_type::before_invoked: {
3607             const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3608             track_waittime(task, now);
3609             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3610             auto& track = tracks_[&task];
3611             track.running = true;
3612             track.startrun = now;
3613         }
3614         break;
3615         case transwarp::event_type::after_finished: {
3616             const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3617             track_runtime(task, now);
3618         }
3619         break;
3620         default: break;
3621         }
3622     }
3623 
3624     /// Resets all timing information
3625     void reset() {
3626         std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3627         tracks_.clear();
3628     }
3629 
3630 private:
3631 
3632     void track_idletime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3633         std::int64_t avg_idletime_us;
3634         {
3635             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3636             auto& track = tracks_[&task];
3637             track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3638             ++track.idlecount;
3639             avg_idletime_us = static_cast<std::int64_t>(track.idletime / track.idlecount);
3640         }
3641         task.set_avg_idletime_us(avg_idletime_us);
3642     }
3643 
3644     void track_waittime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3645         std::int64_t avg_waittime_us;
3646         {
3647             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3648             auto& track = tracks_[&task];
3649             track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3650             ++track.waitcount;
3651             avg_waittime_us = static_cast<std::int64_t>(track.waittime / track.waitcount);
3652         }
3653         task.set_avg_waittime_us(avg_waittime_us);
3654     }
3655 
3656     void track_runtime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3657         std::int64_t avg_runtime_us;
3658         {
3659             std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3660             auto& track = tracks_[&task];
3661             if (!track.running) {
3662                 return;
3663             }
3664             track.running = false;
3665             track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3666             ++track.runcount;
3667             avg_runtime_us = static_cast<std::int64_t>(track.runtime / track.runcount);
3668         }
3669         task.set_avg_runtime_us(avg_runtime_us);
3670     }
3671 
3672     struct track {
3673         bool running = false;
3674         std::chrono::time_point<std::chrono::steady_clock> startidle;
3675         std::chrono::time_point<std::chrono::steady_clock> startwait;
3676         std::chrono::time_point<std::chrono::steady_clock> startrun;
3677         std::chrono::microseconds::rep idletime = 0;
3678         std::chrono::microseconds::rep idlecount = 0;
3679         std::chrono::microseconds::rep waittime = 0;
3680         std::chrono::microseconds::rep waitcount = 0;
3681         std::chrono::microseconds::rep runtime = 0;
3682         std::chrono::microseconds::rep runcount = 0;
3683     };
3684 
3685     transwarp::detail::spinlock spinlock_; // protecting tracks_
3686     std::unordered_map<const transwarp::itask*, track> tracks_;
3687 };
3688 
3689 
3690 /// The releaser will release a task's future when the task's `after_satisfied`
3691 /// event was received which happens when all children received the task's result.
3692 /// The releaser should be used in cases where the task's result is only needed
3693 /// for consumption by its children and can then be discarded.
3694 class releaser : public transwarp::listener {
3695 public:
3696     releaser() = default;
3697 
3698     /// The executor gives control over where a task's future is released
3699     explicit releaser(std::shared_ptr<transwarp::executor> executor)
3700     : executor_(std::move(executor))
3701     {}
3702 
3703     // delete copy/move semantics
3704     releaser(const releaser&) = delete;
3705     releaser& operator=(const releaser&) = delete;
3706     releaser(releaser&&) = delete;
3707     releaser& operator=(releaser&&) = delete;
3708 
3709     void handle_event(const transwarp::event_type event, transwarp::itask& task) override {
3710         if (event == transwarp::event_type::after_satisfied) {
3711             if (executor_) {
3712                 executor_->execute([&task]{ task.reset_future(); }, task);
3713             } else {
3714                 task.reset_future();
3715             }
3716         }
3717     }
3718 
3719 private:
3720     std::shared_ptr<transwarp::executor> executor_;
3721 };
3722 
3723 
3724 } // transwarp
3725