1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 2.2.2
4 /// @author Christian Blume, Guan Wang
5 /// @date 2019
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #ifndef TRANSWARP_CPP11
10 #include <any>
11 #endif
12 #include <array>
13 #include <atomic>
14 #include <chrono>
15 #include <cstddef>
16 #include <cstdint>
17 #include <functional>
18 #include <future>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #ifndef TRANSWARP_CPP11
23 #include <optional>
24 #endif
25 #include <queue>
26 #include <stdexcept>
27 #include <string>
28 #include <thread>
29 #include <tuple>
30 #include <type_traits>
31 #include <unordered_map>
32 #include <utility>
33 #include <vector>
34
35
36 #ifdef TRANSWARP_MINIMUM_TASK_SIZE
37
38 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
39 #define TRANSWARP_DISABLE_TASK_CUSTOM_DATA
40 #endif
41
42 #ifndef TRANSWARP_DISABLE_TASK_NAME
43 #define TRANSWARP_DISABLE_TASK_NAME
44 #endif
45
46 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
47 #define TRANSWARP_DISABLE_TASK_PRIORITY
48 #endif
49
50 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
51 #define TRANSWARP_DISABLE_TASK_REFCOUNT
52 #endif
53
54 #ifndef TRANSWARP_DISABLE_TASK_TIME
55 #define TRANSWARP_DISABLE_TASK_TIME
56 #endif
57
58 #endif
59
60
61 /// The transwarp namespace
62 namespace transwarp {
63
64
65 #ifdef TRANSWARP_CPP11
66 /// A simple value class that optionally holds a string
67 class option_str {
68 public:
option_str()69 option_str() {}
70
option_str(std::string str)71 option_str(std::string str)
72 : str_(std::move(str)),
73 valid_(true)
74 {}
75
76 // default copy/move semantics
77 option_str(const option_str&) = default;
78 option_str& operator=(const option_str&) = default;
79 option_str(option_str&&) = default;
80 option_str& operator=(option_str&&) = default;
81
82 operator bool() const noexcept {
83 return valid_;
84 }
85
86 const std::string& operator*() const noexcept {
87 return str_;
88 }
89
90 private:
91 std::string str_;
92 bool valid_ = false;
93 };
94
95 /// Detail namespace for internal functionality only
96 namespace detail {
97
98 /// Used to handle data storage for a type-erased object
99 class storage {
100 public:
101 virtual ~storage() = default;
102 virtual std::unique_ptr<storage> clone() const = 0;
103 virtual void destroy(void* data) const noexcept = 0;
104 virtual void copy(const void* src, void*& dest) const = 0;
105 virtual void move(void*& src, void*& dest) const noexcept = 0;
106 };
107
108 /// Used to handle data storage for a type-erased object
109 template<typename T>
110 class storage_impl : public transwarp::detail::storage {
111 public:
clone()112 std::unique_ptr<transwarp::detail::storage> clone() const override {
113 return std::unique_ptr<transwarp::detail::storage>(new storage_impl);
114 }
destroy(void * data)115 void destroy(void* data) const noexcept override {
116 delete reinterpret_cast<T*>(data);
117 }
copy(const void * src,void * & dest)118 void copy(const void* src, void*& dest) const override {
119 dest = new T(*reinterpret_cast<const T*>(src));
120 }
move(void * & src,void * & dest)121 void move(void*& src, void*& dest) const noexcept override {
122 dest = src;
123 src = nullptr;
124 }
125 };
126
127 } // detail
128
129 /// A simple value class that can hold any value
130 class any_data {
131 public:
any_data()132 any_data()
133 : data_(nullptr)
134 {}
135
136 template<typename T>
any_data(T && value)137 any_data(T&& value)
138 : storage_(new transwarp::detail::storage_impl<typename std::decay<T>::type>),
139 data_(new typename std::decay<T>::type(std::forward<T>(value)))
140 {}
141
any_data(const any_data & other)142 any_data(const any_data& other)
143 : storage_(other.storage_ ? other.storage_->clone() : nullptr)
144 {
145 if (other.data_) {
146 storage_->copy(other.data_, data_);
147 } else {
148 data_ = nullptr;
149 }
150 }
151
152 any_data& operator=(const any_data& other) {
153 if (this != &other) {
154 if (storage_) {
155 storage_->destroy(data_);
156 }
157 storage_ = other.storage_ ? other.storage_->clone() : nullptr;
158 if (other.data_) {
159 storage_->copy(other.data_, data_);
160 } else {
161 data_ = nullptr;
162 }
163 }
164 return *this;
165 }
166
any_data(any_data && other)167 any_data(any_data&& other)
168 : storage_(std::move(other.storage_))
169 {
170 if (other.data_) {
171 storage_->move(other.data_, data_);
172 } else {
173 data_ = nullptr;
174 }
175 }
176
177 any_data& operator=(any_data&& other) {
178 if (this != &other) {
179 if (storage_) {
180 storage_->destroy(data_);
181 }
182 storage_ = std::move(other.storage_);
183 if (other.data_) {
184 storage_->move(other.data_, data_);
185 } else {
186 data_ = nullptr;
187 }
188 }
189 return *this;
190 }
191
~any_data()192 ~any_data() {
193 if (data_) {
194 storage_->destroy(data_);
195 }
196 }
197
has_value()198 bool has_value() const noexcept {
199 return data_ != nullptr;
200 }
201
202 template<typename T>
get()203 const T& get() const {
204 return *reinterpret_cast<const T*>(data_);
205 }
206
207 private:
208 std::unique_ptr<transwarp::detail::storage> storage_;
209 void* data_;
210 };
211
212 using str_view = const std::string&;
213 #else
214 using any_data = std::any;
215 using option_str = std::optional<std::string>;
216 using str_view = std::string_view;
217 #endif
218
219
220 /// The possible task types
221 enum class task_type {
222 root, ///< The task has no parents
223 accept, ///< The task's functor accepts all parent futures
224 accept_any, ///< The task's functor accepts the first parent future that becomes ready
225 consume, ///< The task's functor consumes all parent results
226 consume_any, ///< The task's functor consumes the first parent result that becomes ready
227 wait, ///< The task's functor takes no arguments but waits for all parents to finish
228 wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
229 };
230
231
232 /// Base class for exceptions
233 class transwarp_error : public std::runtime_error {
234 public:
transwarp_error(const std::string & message)235 explicit transwarp_error(const std::string& message)
236 : std::runtime_error{message}
237 {}
238 };
239
240
241 /// Exception thrown when a task is canceled
242 class task_canceled : public transwarp::transwarp_error {
243 public:
task_canceled(const std::string & task_repr)244 explicit task_canceled(const std::string& task_repr)
245 : transwarp::transwarp_error{"Task canceled: " + task_repr}
246 {}
247 };
248
249
250 /// Exception thrown when a task was destroyed prematurely
251 class task_destroyed : public transwarp::transwarp_error {
252 public:
task_destroyed(const std::string & task_repr)253 explicit task_destroyed(const std::string& task_repr)
254 : transwarp::transwarp_error{"Task destroyed: " + task_repr}
255 {}
256 };
257
258
259 /// Exception thrown when an invalid parameter was passed to a function
260 class invalid_parameter : public transwarp::transwarp_error {
261 public:
invalid_parameter(const std::string & parameter)262 explicit invalid_parameter(const std::string& parameter)
263 : transwarp::transwarp_error{"Invalid parameter: " + parameter}
264 {}
265 };
266
267
268 /// Exception thrown when a task is used in unintended ways
269 class control_error : public transwarp::transwarp_error {
270 public:
control_error(const std::string & message)271 explicit control_error(const std::string& message)
272 : transwarp::transwarp_error{"Control error: " + message}
273 {}
274 };
275
276
277 /// The root type. Used for tag dispatch
278 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
279 constexpr transwarp::root_type root{}; ///< The root task tag
280
281 /// The accept type. Used for tag dispatch
282 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
283 constexpr transwarp::accept_type accept{}; ///< The accept task tag
284
285 /// The accept_any type. Used for tag dispatch
286 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
287 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
288
289 /// The consume type. Used for tag dispatch
290 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
291 constexpr transwarp::consume_type consume{}; ///< The consume task tag
292
293 /// The consume_any type. Used for tag dispatch
294 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
295 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
296
297 /// The wait type. Used for tag dispatch
298 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
299 constexpr transwarp::wait_type wait{}; ///< The wait task tag
300
301 /// The wait_any type. Used for tag dispatch
302 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
303 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
304
305
306 class itask;
307
308
309 /// Detail namespace for internal functionality only
310 namespace detail {
311
312 struct visit_visitor;
313 struct unvisit_visitor;
314 struct final_visitor;
315 struct schedule_visitor;
316 struct parent_visitor;
317 struct decrement_refcount_functor;
318
319 } // detail
320
321
322 /// The executor interface used to perform custom task execution
323 class executor {
324 public:
325 virtual ~executor() = default;
326
327 /// Returns the name of the executor
328 virtual std::string name() const = 0;
329
330 /// Runs a task which is wrapped by the given functor. The functor only
331 /// captures one shared pointer and can hence be copied at low cost.
332 /// task represents the task that the functor belongs to.
333 /// This function is only ever called on the thread of the caller to schedule().
334 /// The implementer needs to ensure that this never throws exceptions
335 virtual void execute(const std::function<void()>& functor, transwarp::itask& task) = 0;
336 };
337
338
339 /// The task events that can be subscribed to using the listener interface
340 enum class event_type {
341 before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
342 after_future_changed, ///< Just after the task's future was changed (handle_event called on thread that changed the task's future)
343 before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
344 before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
345 after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
346 after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
347 after_satisfied, ///< Just after a task has satisfied all its children with results (handle_event called on thread where the last child is satisfied)
348 after_custom_data_set, ///< Just after custom data was assigned (handle_event called on thread that custom data was set on)
349 count,
350 };
351
352
353 /// The listener interface to listen to events raised by tasks
354 class listener {
355 public:
356 virtual ~listener() = default;
357
358 /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
359 /// The implementer needs to ensure that this never throws exceptions.
360 virtual void handle_event(transwarp::event_type event, transwarp::itask& task) = 0;
361 };
362
363
364 /// An edge between two tasks
365 class edge {
366 public:
edge(transwarp::itask & parent,transwarp::itask & child)367 edge(transwarp::itask& parent, transwarp::itask& child) noexcept
368 : parent_(&parent), child_(&child)
369 {}
370
371 // default copy/move semantics
372 edge(const edge&) = default;
373 edge& operator=(const edge&) = default;
374 edge(edge&&) = default;
375 edge& operator=(edge&&) = default;
376
377 /// Returns the parent task
parent()378 const transwarp::itask& parent() const noexcept {
379 return *parent_;
380 }
381
382 /// Returns the parent task
parent()383 transwarp::itask& parent() noexcept {
384 return *parent_;
385 }
386
387 /// Returns the child task
child()388 const transwarp::itask& child() const noexcept {
389 return *child_;
390 }
391
392 /// Returns the child task
child()393 transwarp::itask& child() noexcept {
394 return *child_;
395 }
396
397 private:
398 transwarp::itask* parent_;
399 transwarp::itask* child_;
400 };
401
402
403 class timer;
404 class releaser;
405
406 /// An interface for the task class
407 class itask : public std::enable_shared_from_this<itask> {
408 public:
409 virtual ~itask() = default;
410
411 virtual void finalize() = 0;
412 virtual std::size_t id() const noexcept = 0;
413 virtual std::size_t level() const noexcept = 0;
414 virtual transwarp::task_type type() const noexcept = 0;
415 virtual const transwarp::option_str& name() const noexcept = 0;
416 virtual std::shared_ptr<transwarp::executor> executor() const noexcept = 0;
417 virtual std::int64_t priority() const noexcept = 0;
418 virtual const transwarp::any_data& custom_data() const noexcept = 0;
419 virtual bool canceled() const noexcept = 0;
420 virtual std::int64_t avg_idletime_us() const noexcept = 0;
421 virtual std::int64_t avg_waittime_us() const noexcept = 0;
422 virtual std::int64_t avg_runtime_us() const noexcept = 0;
423 virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
424 virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
425 virtual void remove_executor() = 0;
426 virtual void remove_executor_all() = 0;
427 virtual void set_priority(std::int64_t priority) = 0;
428 virtual void set_priority_all(std::int64_t priority) = 0;
429 virtual void reset_priority() = 0;
430 virtual void reset_priority_all() = 0;
431 virtual void set_custom_data(transwarp::any_data custom_data) = 0;
432 virtual void set_custom_data_all(transwarp::any_data custom_data) = 0;
433 virtual void remove_custom_data() = 0;
434 virtual void remove_custom_data_all() = 0;
435 virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
436 virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
437 virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
438 virtual void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
439 virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
440 virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
441 virtual void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) = 0;
442 virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
443 virtual void remove_listeners() = 0;
444 virtual void remove_listeners(transwarp::event_type event) = 0;
445 virtual void remove_listeners_all() = 0;
446 virtual void remove_listeners_all(transwarp::event_type event) = 0;
447 virtual void schedule() = 0;
448 virtual void schedule(transwarp::executor& executor) = 0;
449 virtual void schedule(bool reset) = 0;
450 virtual void schedule(transwarp::executor& executor, bool reset) = 0;
451 virtual void schedule_all() = 0;
452 virtual void schedule_all(transwarp::executor& executor) = 0;
453 virtual void schedule_all(bool reset_all) = 0;
454 virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
455 virtual void set_exception(std::exception_ptr exception) = 0;
456 virtual bool was_scheduled() const noexcept = 0;
457 virtual void wait() const = 0;
458 virtual bool is_ready() const = 0;
459 virtual bool has_result() const = 0;
460 virtual void reset() = 0;
461 virtual void reset_all() = 0;
462 virtual void cancel(bool enabled) noexcept = 0;
463 virtual void cancel_all(bool enabled) noexcept = 0;
464 virtual std::vector<itask*> parents() const = 0;
465 virtual const std::vector<itask*>& tasks() = 0;
466 virtual std::vector<transwarp::edge> edges() = 0;
467
468 protected:
469 virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
470
471 private:
472 friend struct transwarp::detail::visit_visitor;
473 friend struct transwarp::detail::unvisit_visitor;
474 friend struct transwarp::detail::final_visitor;
475 friend struct transwarp::detail::schedule_visitor;
476 friend struct transwarp::detail::parent_visitor;
477 friend class transwarp::timer;
478 friend class transwarp::releaser;
479 friend struct transwarp::detail::decrement_refcount_functor;
480
481 virtual void visit(const std::function<void(itask&)>& visitor) = 0;
482 virtual void unvisit() noexcept = 0;
483 virtual void set_id(std::size_t id) noexcept = 0;
484 virtual void set_level(std::size_t level) noexcept = 0;
485 virtual void set_type(transwarp::task_type type) noexcept = 0;
486 virtual void set_name(transwarp::option_str name) noexcept = 0;
487 virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
488 virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
489 virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
490 virtual void increment_childcount() noexcept = 0;
491 virtual void decrement_refcount() = 0;
492 virtual void reset_future() = 0;
493 };
494
495
496 /// String conversion for the task_type enumeration
497 inline
to_string(const transwarp::task_type & type)498 std::string to_string(const transwarp::task_type& type) {
499 switch (type) {
500 case transwarp::task_type::root: return "root";
501 case transwarp::task_type::accept: return "accept";
502 case transwarp::task_type::accept_any: return "accept_any";
503 case transwarp::task_type::consume: return "consume";
504 case transwarp::task_type::consume_any: return "consume_any";
505 case transwarp::task_type::wait: return "wait";
506 case transwarp::task_type::wait_any: return "wait_any";
507 }
508 throw transwarp::invalid_parameter{"task type"};
509 }
510
511
512 /// String conversion for the itask class
513 inline
514 std::string to_string(const transwarp::itask& task, transwarp::str_view separator="\n") {
515 std::string s;
516 s += '"';
517 const transwarp::option_str& name = task.name();
518 if (name) {
519 s += std::string{"<"} + *name + std::string{">"} + separator.data();
520 }
521 s += transwarp::to_string(task.type());
522 s += std::string{" id="} + std::to_string(task.id());
523 s += std::string{" lev="} + std::to_string(task.level());
524 const std::shared_ptr<transwarp::executor> exec = task.executor();
525 if (exec) {
526 s += separator.data() + std::string{"<"} + exec->name() + std::string{">"};
527 }
528 const std::int64_t avg_idletime_us = task.avg_idletime_us();
529 if (avg_idletime_us >= 0) {
530 s += separator.data() + std::string{"avg-idle-us="} + std::to_string(avg_idletime_us);
531 }
532 const std::int64_t avg_waittime_us = task.avg_waittime_us();
533 if (avg_waittime_us >= 0) {
534 s += separator.data() + std::string{"avg-wait-us="} + std::to_string(avg_waittime_us);
535 }
536 const std::int64_t avg_runtime_us = task.avg_runtime_us();
537 if (avg_runtime_us >= 0) {
538 s += separator.data() + std::string{"avg-run-us="} + std::to_string(avg_runtime_us);
539 }
540 return s + '"';
541 }
542
543
544 /// String conversion for the edge class
545 inline
546 std::string to_string(const transwarp::edge& edge, transwarp::str_view separator="\n") {
547 return transwarp::to_string(edge.parent(), separator) + std::string{" -> "} + transwarp::to_string(edge.child(), separator);
548 }
549
550
551 /// Creates a dot-style string from the given edges
552 inline
553 std::string to_string(const std::vector<transwarp::edge>& edges, transwarp::str_view separator="\n") {
554 std::string dot = std::string{"digraph {"} + separator.data();
555 for (const transwarp::edge& edge : edges) {
556 dot += transwarp::to_string(edge, separator) + separator.data();
557 }
558 dot += std::string{"}"};
559 return dot;
560 }
561
562
563 /// Removes reference and const from a type
564 template<typename T>
565 struct decay {
566 using type = typename std::remove_const<typename std::remove_reference<T>::type>::type;
567 };
568
569
570 /// Returns the result type of a std::shared_future<T>
571 template<typename T>
572 struct result {
573 using type = decltype(std::declval<std::shared_future<T>>().get());
574 };
575
576
577 /// Detail namespace for internal functionality only
578 namespace detail {
579
580 /// Clones a task
581 template<typename TaskType>
clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>,std::shared_ptr<transwarp::itask>> & task_cache,const std::shared_ptr<TaskType> & t)582 std::shared_ptr<TaskType> clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<TaskType>& t) {
583 const auto original_task = std::static_pointer_cast<transwarp::itask>(t);
584 const auto task_cache_it = task_cache.find(original_task);
585 if (task_cache_it != task_cache.cend()) {
586 return std::static_pointer_cast<TaskType>(task_cache_it->second);
587 } else {
588 auto cloned_task = t->clone_impl(task_cache);
589 task_cache[original_task] = cloned_task;
590 return cloned_task;
591 }
592 }
593
594 } // detail
595
596
597 /// The task class
598 template<typename ResultType>
599 class task : public transwarp::itask {
600 public:
601 using result_type = ResultType;
602
603 virtual ~task() = default;
604
clone()605 std::shared_ptr<task> clone() const {
606 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
607 return clone_impl(task_cache);
608 }
609
610 virtual void set_value(const typename transwarp::decay<result_type>::type& value) = 0;
611 virtual void set_value(typename transwarp::decay<result_type>::type&& value) = 0;
612 virtual std::shared_future<result_type> future() const noexcept = 0;
613 virtual typename transwarp::result<result_type>::type get() const = 0;
614
615 private:
616 template<typename T>
617 friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
618
619 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
620 };
621
622 /// The task class (reference result type)
623 template<typename ResultType>
624 class task<ResultType&> : public transwarp::itask {
625 public:
626 using result_type = ResultType&;
627
628 virtual ~task() = default;
629
clone()630 std::shared_ptr<task> clone() const {
631 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
632 return clone_impl(task_cache);
633 }
634
635 virtual void set_value(typename transwarp::decay<result_type>::type& value) = 0;
636 virtual std::shared_future<result_type> future() const noexcept = 0;
637 virtual typename transwarp::result<result_type>::type get() const = 0;
638
639 private:
640 template<typename T>
641 friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
642
643 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
644 };
645
646 /// The task class (void result type)
647 template<>
648 class task<void> : public transwarp::itask {
649 public:
650 using result_type = void;
651
652 virtual ~task() = default;
653
clone()654 std::shared_ptr<task> clone() const {
655 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
656 return clone_impl(task_cache);
657 }
658
659 virtual void set_value() = 0;
660 virtual std::shared_future<result_type> future() const noexcept = 0;
661 virtual result_type get() const = 0;
662
663 private:
664 template<typename T>
665 friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
666
667 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
668 };
669
670
671 /// Detail namespace for internal functionality only
672 namespace detail {
673
674 template<bool>
675 struct assign_task_if_impl;
676
677 } // detail
678
679
680 /// A base class for a user-defined functor that needs access to the associated
681 /// task or a cancel point to stop a task while it's running
682 class functor {
683 public:
684
685 virtual ~functor() = default;
686
687 protected:
688
689 /// The associated task (only to be called after the task was constructed)
transwarp_task()690 const transwarp::itask& transwarp_task() const noexcept {
691 return *transwarp_task_;
692 }
693
694 /// The associated task (only to be called after the task was constructed)
transwarp_task()695 transwarp::itask& transwarp_task() noexcept {
696 return *transwarp_task_;
697 }
698
699 /// If the associated task is canceled then this will throw transwarp::task_canceled
700 /// which will stop the task while it's running (only to be called after the task was constructed)
transwarp_cancel_point()701 void transwarp_cancel_point() const {
702 if (transwarp_task_->canceled()) {
703 throw transwarp::task_canceled{std::to_string(transwarp_task_->id())};
704 }
705 }
706
707 private:
708 template<bool>
709 friend struct transwarp::detail::assign_task_if_impl;
710
711 transwarp::itask* transwarp_task_{};
712 };
713
714
715 /// Detail namespace for internal functionality only
716 namespace detail {
717
718
719 /// A simple thread pool used to execute tasks in parallel
720 class thread_pool {
721 public:
722
723 explicit thread_pool(const std::size_t n_threads,
724 std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
725 : on_thread_started_{std::move(on_thread_started)}
726 {
727 if (n_threads == 0) {
728 throw transwarp::invalid_parameter{"number of threads"};
729 }
730 for (std::size_t i = 0; i < n_threads; ++i) {
731 std::thread thread;
732 try {
733 thread = std::thread(&thread_pool::worker, this, i);
catch(...)734 } catch (...) {
735 shutdown();
736 throw;
737 }
738 try {
739 threads_.push_back(std::move(thread));
catch(...)740 } catch (...) {
741 shutdown();
742 thread.join();
743 throw;
744 }
745 }
746 }
747
748 // delete copy/move semantics
749 thread_pool(const thread_pool&) = delete;
750 thread_pool& operator=(const thread_pool&) = delete;
751 thread_pool(thread_pool&&) = delete;
752 thread_pool& operator=(thread_pool&&) = delete;
753
~thread_pool()754 ~thread_pool() {
755 shutdown();
756 }
757
push(const std::function<void ()> & functor)758 void push(const std::function<void()>& functor) {
759 {
760 std::lock_guard<std::mutex> lock{mutex_};
761 functors_.push(functor);
762 }
763 cond_var_.notify_one();
764 }
765
766 private:
767
worker(const std::size_t index)768 void worker(const std::size_t index) {
769 if (on_thread_started_) {
770 on_thread_started_(index);
771 }
772 for (;;) {
773 std::function<void()> functor;
774 {
775 std::unique_lock<std::mutex> lock{mutex_};
776 cond_var_.wait(lock, [this]{
777 return done_ || !functors_.empty();
778 });
779 if (done_ && functors_.empty()) {
780 break;
781 }
782 functor = std::move(functors_.front());
783 functors_.pop();
784 }
785 functor();
786 }
787 }
788
shutdown()789 void shutdown() {
790 {
791 std::lock_guard<std::mutex> lock{mutex_};
792 done_ = true;
793 }
794 cond_var_.notify_all();
795 for (std::thread& thread : threads_) {
796 thread.join();
797 }
798 threads_.clear();
799 }
800
801 bool done_ = false;
802 std::function<void(std::size_t)> on_thread_started_;
803 std::vector<std::thread> threads_;
804 std::queue<std::function<void()>> functors_;
805 std::condition_variable cond_var_;
806 std::mutex mutex_;
807 };
808
809
810 #ifdef TRANSWARP_CPP11
811 template<std::size_t...> struct indices {};
812
813 template<std::size_t...> struct construct_range;
814
815 template<std::size_t end, std::size_t idx, std::size_t... i>
816 struct construct_range<end, idx, i...> : construct_range<end, idx + 1, i..., idx> {};
817
818 template<std::size_t end, std::size_t... i>
819 struct construct_range<end, end, i...> {
820 using type = transwarp::detail::indices<i...>;
821 };
822
823 template<std::size_t b, std::size_t e>
824 struct index_range {
825 using type = typename transwarp::detail::construct_range<e, b>::type;
826 };
827
828 template<typename Functor, typename Tuple>
829 void call_with_each_index(transwarp::detail::indices<>, Functor&&, Tuple&&) {}
830
831 template<std::size_t i, std::size_t... j, typename Functor, typename Tuple>
832 void call_with_each_index(transwarp::detail::indices<i, j...>, Functor&& f, Tuple&& t) {
833 f(std::get<i>(t));
834 transwarp::detail::call_with_each_index(transwarp::detail::indices<j...>{}, std::forward<Functor>(f), std::forward<Tuple>(t));
835 }
836 #endif
837
838 template<typename Functor, typename Tuple>
839 void apply_to_each(Functor&& f, Tuple&& t) {
840 #ifdef TRANSWARP_CPP11
841 constexpr std::size_t n = std::tuple_size<typename std::decay<Tuple>::type>::value;
842 using index_t = typename transwarp::detail::index_range<0, n>::type;
843 transwarp::detail::call_with_each_index(index_t{}, std::forward<Functor>(f), std::forward<Tuple>(t));
844 #else
845 std::apply([&f](auto&&... arg){(..., std::forward<Functor>(f)(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
846 #endif
847 }
848
849 template<typename Functor, typename ElementType>
850 void apply_to_each(Functor&& f, const std::vector<ElementType>& v) {
851 std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
852 }
853
854 template<typename Functor, typename ElementType>
855 void apply_to_each(Functor&& f, std::vector<ElementType>& v) {
856 std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
857 }
858
859
860 template<int offset, typename... ParentResults>
861 struct assign_futures_impl {
862 static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
863 std::get<offset>(target) = std::get<offset>(source)->future();
864 assign_futures_impl<offset - 1, ParentResults...>::work(source, target);
865 }
866 };
867
868 template<typename... ParentResults>
869 struct assign_futures_impl<-1, ParentResults...> {
870 static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
871 };
872
873 /// Returns the futures from the given tuple of tasks
874 template<typename... ParentResults>
875 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
876 std::tuple<std::shared_future<ParentResults>...> result;
877 assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
878 return result;
879 }
880
881 /// Returns the futures from the given vector of tasks
882 template<typename ParentResultType>
883 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
884 std::vector<std::shared_future<ParentResultType>> result;
885 result.reserve(input.size());
886 for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
887 result.emplace_back(task->future());
888 }
889 return result;
890 }
891
892
893 /// Runs the task with the given arguments, hence, invoking the task's functor
894 template<typename Result, typename Task, typename... Args>
895 Result run_task(std::size_t task_id, const std::weak_ptr<Task>& task, Args&&... args) {
896 const std::shared_ptr<Task> t = task.lock();
897 if (!t) {
898 throw transwarp::task_destroyed{std::to_string(task_id)};
899 }
900 if (t->canceled()) {
901 throw transwarp::task_canceled{std::to_string(task_id)};
902 }
903 t->raise_event(transwarp::event_type::before_invoked);
904 return (*t->functor_)(std::forward<Args>(args)...);
905 }
906
907
908 struct wait_for_all_functor {
909 template<typename T>
910 void operator()(const T& p) const {
911 p->future().wait();
912 }
913 };
914
915 /// Waits for all parents to finish
916 template<typename... ParentResults>
917 void wait_for_all(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
918 transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents);
919 }
920
921 /// Waits for all parents to finish
922 template<typename ParentResultType>
923 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
924 transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents);
925 }
926
927
928 template<typename Parent>
929 Parent wait_for_any_impl() {
930 return {};
931 }
932
933 template<typename Parent, typename ParentResult, typename... ParentResults>
934 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
935 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
936 if (status == std::future_status::ready) {
937 return parent;
938 }
939 return transwarp::detail::wait_for_any_impl<Parent>(parents...);
940 }
941
942 /// Waits for the first parent to finish
943 template<typename Parent, typename... ParentResults>
944 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
945 for (;;) {
946 Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
947 if (parent) {
948 return parent;
949 }
950 }
951 }
952
953
954 /// Waits for the first parent to finish
955 template<typename ParentResultType>
956 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
957 for (;;) {
958 for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
959 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
960 if (status == std::future_status::ready) {
961 return parent;
962 }
963 }
964 }
965 }
966
967
968 template<typename OneResult>
969 struct cancel_all_but_one_functor {
970 explicit cancel_all_but_one_functor(const std::shared_ptr<transwarp::task<OneResult>>& one) noexcept
971 : one_(one) {}
972
973 template<typename T>
974 void operator()(const T& parent) const {
975 if (one_ != parent) {
976 parent->cancel(true);
977 }
978 }
979
980 const std::shared_ptr<transwarp::task<OneResult>>& one_;
981 };
982
983 /// Cancels all tasks but one
984 template<typename OneResult, typename... ParentResults>
985 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
986 transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor<OneResult>{one}, parents);
987 }
988
989 /// Cancels all tasks but one
990 template<typename OneResult, typename ParentResultType>
991 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
992 transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor<OneResult>{one}, parents);
993 }
994
995
996 struct decrement_refcount_functor {
997 template<typename T>
998 void operator()(const T& task) const {
999 task->decrement_refcount();
1000 }
1001 };
1002
1003 /// Decrements the refcount of all parents
1004 template<typename... ParentResults>
1005 void decrement_refcount(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1006 transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents);
1007 }
1008
1009 /// Decrements the refcount of all parents
1010 template<typename ParentResultType>
1011 void decrement_refcount(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1012 transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents);
1013 }
1014
1015
1016 template<typename TaskType, bool done, int total, int... n>
1017 struct call_impl {
1018 template<typename Result, typename Task, typename... ParentResults>
1019 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1020 return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
1021 work<Result>(task_id, task, parents);
1022 }
1023 };
1024
1025 template<typename TaskType>
1026 struct call_impl_vector;
1027
1028 template<int total, int... n>
1029 struct call_impl<transwarp::root_type, true, total, n...> {
1030 template<typename Result, typename Task, typename... ParentResults>
1031 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
1032 return transwarp::detail::run_task<Result>(task_id, task);
1033 }
1034 };
1035
1036 template<>
1037 struct call_impl_vector<transwarp::root_type> {
1038 template<typename Result, typename Task, typename ParentResultType>
1039 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
1040 return transwarp::detail::run_task<Result>(task_id, task);
1041 }
1042 };
1043
1044 template<int total, int... n>
1045 struct call_impl<transwarp::accept_type, true, total, n...> {
1046 template<typename Result, typename Task, typename... ParentResults>
1047 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1048 transwarp::detail::wait_for_all(parents);
1049 const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1050 transwarp::detail::decrement_refcount(parents);
1051 return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
1052 }
1053 };
1054
1055 template<>
1056 struct call_impl_vector<transwarp::accept_type> {
1057 template<typename Result, typename Task, typename ParentResultType>
1058 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1059 transwarp::detail::wait_for_all(parents);
1060 std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1061 transwarp::detail::decrement_refcount(parents);
1062 return transwarp::detail::run_task<Result>(task_id, task, std::move(futures));
1063 }
1064 };
1065
1066 template<int total, int... n>
1067 struct call_impl<transwarp::accept_any_type, true, total, n...> {
1068 template<typename Result, typename Task, typename... ParentResults>
1069 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1070 using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
1071 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1072 transwarp::detail::cancel_all_but_one(parent, parents);
1073 auto future = parent->future();
1074 transwarp::detail::decrement_refcount(parents);
1075 return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
1076 }
1077 };
1078
1079 template<>
1080 struct call_impl_vector<transwarp::accept_any_type> {
1081 template<typename Result, typename Task, typename ParentResultType>
1082 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1083 std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1084 transwarp::detail::cancel_all_but_one(parent, parents);
1085 auto future = parent->future();
1086 transwarp::detail::decrement_refcount(parents);
1087 return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
1088 }
1089 };
1090
1091 template<int total, int... n>
1092 struct call_impl<transwarp::consume_type, true, total, n...> {
1093 template<typename Result, typename Task, typename... ParentResults>
1094 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1095 transwarp::detail::wait_for_all(parents);
1096 const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1097 transwarp::detail::decrement_refcount(parents);
1098 return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures).get()...);
1099 }
1100 };
1101
1102 template<>
1103 struct call_impl_vector<transwarp::consume_type> {
1104 template<typename Result, typename Task, typename ParentResultType>
1105 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1106 transwarp::detail::wait_for_all(parents);
1107 const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1108 transwarp::detail::decrement_refcount(parents);
1109 std::vector<ParentResultType> results;
1110 results.reserve(futures.size());
1111 for (const std::shared_future<ParentResultType>& future : futures) {
1112 results.emplace_back(future.get());
1113 }
1114 return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
1115 }
1116 };
1117
1118 template<int total, int... n>
1119 struct call_impl<transwarp::consume_any_type, true, total, n...> {
1120 template<typename Result, typename Task, typename... ParentResults>
1121 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1122 using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; /// Use first type as reference
1123 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1124 transwarp::detail::cancel_all_but_one(parent, parents);
1125 const auto future = parent->future();
1126 transwarp::detail::decrement_refcount(parents);
1127 return transwarp::detail::run_task<Result>(task_id, task, future.get());
1128 }
1129 };
1130
1131 template<>
1132 struct call_impl_vector<transwarp::consume_any_type> {
1133 template<typename Result, typename Task, typename ParentResultType>
1134 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1135 std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1136 transwarp::detail::cancel_all_but_one(parent, parents);
1137 const auto future = parent->future();
1138 transwarp::detail::decrement_refcount(parents);
1139 return transwarp::detail::run_task<Result>(task_id, task, future.get());
1140 }
1141 };
1142
1143 struct future_get_functor {
1144 template<typename T>
1145 void operator()(const std::shared_future<T>& f) const {
1146 f.get();
1147 }
1148 };
1149
1150 template<int total, int... n>
1151 struct call_impl<transwarp::wait_type, true, total, n...> {
1152 template<typename Result, typename Task, typename... ParentResults>
1153 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1154 transwarp::detail::wait_for_all(parents);
1155 const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1156 transwarp::detail::decrement_refcount(parents);
1157 transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated
1158 return transwarp::detail::run_task<Result>(task_id, task);
1159 }
1160 };
1161
1162 template<>
1163 struct call_impl_vector<transwarp::wait_type> {
1164 template<typename Result, typename Task, typename ParentResultType>
1165 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1166 transwarp::detail::wait_for_all(parents);
1167 const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1168 transwarp::detail::decrement_refcount(parents);
1169 transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated
1170 return transwarp::detail::run_task<Result>(task_id, task);
1171 }
1172 };
1173
1174 template<int total, int... n>
1175 struct call_impl<transwarp::wait_any_type, true, total, n...> {
1176 template<typename Result, typename Task, typename... ParentResults>
1177 static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1178 using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
1179 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1180 transwarp::detail::cancel_all_but_one(parent, parents);
1181 const auto future = parent->future();
1182 transwarp::detail::decrement_refcount(parents);
1183 future.get(); // Ensures that exceptions are propagated
1184 return transwarp::detail::run_task<Result>(task_id, task);
1185 }
1186 };
1187
1188 template<>
1189 struct call_impl_vector<transwarp::wait_any_type> {
1190 template<typename Result, typename Task, typename ParentResultType>
1191 static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1192 std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1193 transwarp::detail::cancel_all_but_one(parent, parents);
1194 const auto future = parent->future();
1195 transwarp::detail::decrement_refcount(parents);
1196 future.get(); // Ensures that exceptions are propagated
1197 return transwarp::detail::run_task<Result>(task_id, task);
1198 }
1199 };
1200
1201 /// Calls the functor of the given task with the results from the tuple of parents.
1202 /// Throws transwarp::task_canceled if the task is canceled.
1203 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1204 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
1205 Result call(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1206 constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
1207 return transwarp::detail::call_impl<TaskType, 0 == n, static_cast<int>(n)>::template
1208 work<Result>(task_id, task, parents);
1209 }
1210
1211 /// Calls the functor of the given task with the results from the vector of parents.
1212 /// Throws transwarp::task_canceled if the task is canceled.
1213 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1214 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
1215 Result call(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1216 return transwarp::detail::call_impl_vector<TaskType>::template
1217 work<Result>(task_id, task, parents);
1218 }
1219
1220
1221 template<typename Functor>
1222 struct call_with_each_functor {
1223 explicit call_with_each_functor(const Functor& f) noexcept
1224 : f_(f) {}
1225
1226 template<typename T>
1227 void operator()(const T& task) const {
1228 if (!task) {
1229 throw transwarp::invalid_parameter{"task pointer"};
1230 }
1231 f_(*task);
1232 }
1233
1234 const Functor& f_;
1235 };
1236
1237 /// Calls the functor with every element in the tuple
1238 template<typename Functor, typename... ParentResults>
1239 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
1240 transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor<Functor>{f}, t);
1241 }
1242
1243 /// Calls the functor with every element in the vector
1244 template<typename Functor, typename ParentResultType>
1245 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
1246 transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor<Functor>{f}, v);
1247 }
1248
1249
1250 /// Sets level of a task and increments the child count
1251 struct parent_visitor {
1252 explicit parent_visitor(transwarp::itask& task) noexcept
1253 : task_(task) {}
1254
1255 void operator()(transwarp::itask& task) const {
1256 if (task_.level() <= task.level()) {
1257 // A child's level is always larger than any of its parents' levels
1258 task_.set_level(task.level() + 1);
1259 }
1260 task.increment_childcount();
1261 }
1262
1263 transwarp::itask& task_;
1264 };
1265
1266 /// Applies final bookkeeping to the task and collects the task
1267 struct final_visitor {
1268 explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
1269 : tasks_(tasks) {}
1270
1271 void operator()(transwarp::itask& task) noexcept {
1272 tasks_.push_back(&task);
1273 task.set_id(id_++);
1274 }
1275
1276 std::vector<transwarp::itask*>& tasks_;
1277 std::size_t id_ = 0;
1278 };
1279
1280 /// Generates edges
1281 struct edges_visitor {
1282 explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
1283 : edges_(edges) {}
1284
1285 void operator()(transwarp::itask& task) {
1286 for (transwarp::itask* parent : task.parents()) {
1287 edges_.emplace_back(*parent, task);
1288 }
1289 }
1290
1291 std::vector<transwarp::edge>& edges_;
1292 };
1293
1294 /// Schedules using the given executor
1295 struct schedule_visitor {
1296 schedule_visitor(bool reset, transwarp::executor* executor) noexcept
1297 : reset_(reset), executor_(executor) {}
1298
1299 void operator()(transwarp::itask& task) {
1300 task.schedule_impl(reset_, executor_);
1301 }
1302
1303 bool reset_;
1304 transwarp::executor* executor_;
1305 };
1306
1307 /// Resets the given task
1308 struct reset_visitor {
1309
1310 void operator()(transwarp::itask& task) const {
1311 task.reset();
1312 }
1313 };
1314
1315 /// Cancels or resumes the given task
1316 struct cancel_visitor {
1317 explicit cancel_visitor(bool enabled) noexcept
1318 : enabled_{enabled} {}
1319
1320 void operator()(transwarp::itask& task) const noexcept {
1321 task.cancel(enabled_);
1322 }
1323
1324 bool enabled_;
1325 };
1326
1327 /// Assigns an executor to the given task
1328 struct set_executor_visitor {
1329 explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1330 : executor_{std::move(executor)} {}
1331
1332 void operator()(transwarp::itask& task) const noexcept {
1333 task.set_executor(executor_);
1334 }
1335
1336 std::shared_ptr<transwarp::executor> executor_;
1337 };
1338
1339 /// Removes the executor from the given task
1340 struct remove_executor_visitor {
1341
1342 void operator()(transwarp::itask& task) const noexcept {
1343 task.remove_executor();
1344 }
1345 };
1346
1347 /// Assigns a priority to the given task
1348 struct set_priority_visitor {
1349 explicit set_priority_visitor(std::int64_t priority) noexcept
1350 : priority_{priority} {}
1351
1352 void operator()(transwarp::itask& task) const noexcept {
1353 task.set_priority(priority_);
1354 }
1355
1356 std::int64_t priority_;
1357 };
1358
1359 /// Resets the priority of the given task
1360 struct reset_priority_visitor {
1361
1362 void operator()(transwarp::itask& task) const noexcept {
1363 task.reset_priority();
1364 }
1365 };
1366
1367 /// Assigns custom data to the given task
1368 struct set_custom_data_visitor {
1369 explicit set_custom_data_visitor(transwarp::any_data custom_data) noexcept
1370 : custom_data_{std::move(custom_data)} {}
1371
1372 void operator()(transwarp::itask& task) const noexcept {
1373 task.set_custom_data(custom_data_);
1374 }
1375
1376 transwarp::any_data custom_data_;
1377 };
1378
1379 /// Removes custom data from the given task
1380 struct remove_custom_data_visitor {
1381
1382 void operator()(transwarp::itask& task) const noexcept {
1383 task.remove_custom_data();
1384 }
1385 };
1386
1387 /// Pushes the given task into the vector of tasks
1388 struct push_task_visitor {
1389 explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1390 : tasks_(tasks) {}
1391
1392 void operator()(transwarp::itask& task) {
1393 tasks_.push_back(&task);
1394 }
1395
1396 std::vector<transwarp::itask*>& tasks_;
1397 };
1398
1399 /// Adds a new listener to the given task
1400 struct add_listener_visitor {
1401 explicit add_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1402 : listener_(std::move(listener))
1403 {}
1404
1405 void operator()(transwarp::itask& task) {
1406 task.add_listener(listener_);
1407 }
1408
1409 std::shared_ptr<transwarp::listener> listener_;
1410 };
1411
1412 /// Adds a new listener per event type to the given task
1413 struct add_listener_per_event_visitor {
1414 add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1415 : event_(event), listener_(std::move(listener))
1416 {}
1417
1418 void operator()(transwarp::itask& task) {
1419 task.add_listener(event_, listener_);
1420 }
1421
1422 transwarp::event_type event_;
1423 std::shared_ptr<transwarp::listener> listener_;
1424 };
1425
1426 /// Removes a listener from the given task
1427 struct remove_listener_visitor {
1428 explicit remove_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1429 : listener_(std::move(listener))
1430 {}
1431
1432 void operator()(transwarp::itask& task) {
1433 task.remove_listener(listener_);
1434 }
1435
1436 std::shared_ptr<transwarp::listener> listener_;
1437 };
1438
1439 /// Removes a listener per event type from the given task
1440 struct remove_listener_per_event_visitor {
1441 remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1442 : event_(event), listener_(std::move(listener))
1443 {}
1444
1445 void operator()(transwarp::itask& task) {
1446 task.remove_listener(event_, listener_);
1447 }
1448
1449 transwarp::event_type event_;
1450 std::shared_ptr<transwarp::listener> listener_;
1451 };
1452
1453 /// Removes all listeners from the given task
1454 struct remove_listeners_visitor {
1455
1456 void operator()(transwarp::itask& task) {
1457 task.remove_listeners();
1458 }
1459
1460 };
1461
1462 /// Removes all listeners per event type from the given task
1463 struct remove_listeners_per_event_visitor {
1464 explicit remove_listeners_per_event_visitor(transwarp::event_type event)
1465 : event_(event)
1466 {}
1467
1468 void operator()(transwarp::itask& task) {
1469 task.remove_listeners(event_);
1470 }
1471
1472 transwarp::event_type event_;
1473 };
1474
1475 /// Visits the given task using the visitor given in the constructor
1476 struct visit_visitor {
1477 explicit visit_visitor(const std::function<void(transwarp::itask&)>& visitor) noexcept
1478 : visitor_(visitor) {}
1479
1480 void operator()(transwarp::itask& task) const {
1481 task.visit(visitor_);
1482 }
1483
1484 const std::function<void(transwarp::itask&)>& visitor_;
1485 };
1486
1487 /// Unvisits the given task
1488 struct unvisit_visitor {
1489
1490 void operator()(transwarp::itask& task) const noexcept {
1491 task.unvisit();
1492 }
1493 };
1494
1495 /// Determines the result type of the Functor dispatching on the task type
1496 template<typename TaskType, typename Functor, typename... ParentResults>
1497 struct functor_result {
1498 static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1499 std::is_same<TaskType, transwarp::accept_type>::value ||
1500 std::is_same<TaskType, transwarp::accept_any_type>::value ||
1501 std::is_same<TaskType, transwarp::consume_type>::value ||
1502 std::is_same<TaskType, transwarp::consume_any_type>::value ||
1503 std::is_same<TaskType, transwarp::wait_type>::value ||
1504 std::is_same<TaskType, transwarp::wait_any_type>::value,
1505 "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1506 };
1507
1508 template<typename Functor, typename... ParentResults>
1509 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1510 static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1511 using type = decltype(std::declval<Functor>()());
1512 };
1513
1514 template<typename Functor, typename... ParentResults>
1515 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1516 static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1517 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1518 };
1519
1520 template<typename Functor, typename ParentResultType>
1521 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1522 using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1523 };
1524
1525 template<typename Functor, typename... ParentResults>
1526 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1527 static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1528 using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1529 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1530 };
1531
1532 template<typename Functor, typename ParentResultType>
1533 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1534 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1535 };
1536
1537 template<typename Functor, typename... ParentResults>
1538 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1539 static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1540 using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1541 };
1542
1543 template<typename Functor, typename ParentResultType>
1544 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1545 using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1546 };
1547
1548 template<typename Functor, typename... ParentResults>
1549 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1550 static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1551 using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1552 using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1553 };
1554
1555 template<typename Functor, typename ParentResultType>
1556 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1557 using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1558 };
1559
1560 template<typename Functor, typename... ParentResults>
1561 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1562 static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1563 using type = decltype(std::declval<Functor>()());
1564 };
1565
1566 template<typename Functor, typename ParentResultType>
1567 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1568 using type = decltype(std::declval<Functor>()());
1569 };
1570
1571 template<typename Functor, typename... ParentResults>
1572 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1573 static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1574 using type = decltype(std::declval<Functor>()());
1575 };
1576
1577 template<typename Functor, typename ParentResultType>
1578 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1579 using type = decltype(std::declval<Functor>()());
1580 };
1581
1582
1583 template<bool is_transwarp_functor>
1584 struct assign_task_if_impl;
1585
1586 template<>
1587 struct assign_task_if_impl<false> {
1588 template<typename Functor>
1589 void operator()(Functor&, transwarp::itask&) const noexcept {}
1590 };
1591
1592 template<>
1593 struct assign_task_if_impl<true> {
1594 template<typename Functor>
1595 void operator()(Functor& functor, transwarp::itask& task) const noexcept {
1596 functor.transwarp_task_ = &task;
1597 }
1598 };
1599
1600 /// Assigns the task to the given functor if the functor is a subclass of transwarp::functor
1601 template<typename Functor>
1602 void assign_task_if(Functor& functor, transwarp::itask& task) noexcept {
1603 transwarp::detail::assign_task_if_impl<std::is_base_of<transwarp::functor, Functor>::value>{}(functor, task);
1604 }
1605
1606
1607 /// Returns a ready future with the given value as its state
1608 template<typename ResultType, typename Value>
1609 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1610 std::promise<ResultType> promise;
1611 promise.set_value(std::forward<Value>(value));
1612 return promise.get_future();
1613 }
1614
1615 /// Returns a ready future
1616 inline
1617 std::shared_future<void> make_ready_future() {
1618 std::promise<void> promise;
1619 promise.set_value();
1620 return promise.get_future();
1621 }
1622
1623 /// Returns a ready future with the given exception as its state
1624 template<typename ResultType>
1625 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1626 if (!exception) {
1627 throw transwarp::invalid_parameter{"exception pointer"};
1628 }
1629 std::promise<ResultType> promise;
1630 promise.set_exception(exception);
1631 return promise.get_future();
1632 }
1633
1634
1635 struct clone_task_functor {
1636 explicit clone_task_functor(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) noexcept
1637 : task_cache_(task_cache) {}
1638
1639 template<typename T>
1640 void operator()(T& t) {
1641 t = transwarp::detail::clone_task(task_cache_, t);
1642 }
1643
1644 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache_;
1645 };
1646
1647
1648 struct push_task_functor {
1649 explicit push_task_functor(std::vector<transwarp::itask*>& tasks) noexcept
1650 : tasks_(tasks) {}
1651
1652 template<typename T>
1653 void operator()(T& t) {
1654 tasks_.push_back(t.get());
1655 }
1656
1657 std::vector<transwarp::itask*>& tasks_;
1658 };
1659
1660
1661 /// Determines the type of the parents
1662 template<typename... ParentResults>
1663 struct parents {
1664 using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1665 static std::size_t size(const type&) {
1666 return std::tuple_size<type>::value;
1667 }
1668 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1669 type cloned = obj;
1670 transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned);
1671 return cloned;
1672 }
1673 static std::vector<transwarp::itask*> tasks(const type& parents) {
1674 std::vector<transwarp::itask*> tasks;
1675 transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents);
1676 return tasks;
1677 }
1678 };
1679
1680 /// Determines the type of the parents. Specialization for vector parents
1681 template<typename ParentResultType>
1682 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1683 using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1684 static std::size_t size(const type& obj) {
1685 return obj.size();
1686 }
1687 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1688 type cloned = obj;
1689 transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned);
1690 return cloned;
1691 }
1692 static std::vector<transwarp::itask*> tasks(const type& parents) {
1693 std::vector<transwarp::itask*> tasks;
1694 transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents);
1695 return tasks;
1696 }
1697 };
1698
1699
1700 template<typename ResultType, typename TaskType>
1701 class base_runner {
1702 protected:
1703
1704 template<typename Task, typename Parents>
1705 void call(std::size_t task_id,
1706 const std::weak_ptr<Task>& task,
1707 const Parents& parents) {
1708 promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1709 }
1710
1711 std::promise<ResultType> promise_;
1712 };
1713
1714 template<typename TaskType>
1715 class base_runner<void, TaskType> {
1716 protected:
1717
1718 template<typename Task, typename Parents>
1719 void call(std::size_t task_id,
1720 const std::weak_ptr<Task>& task,
1721 const Parents& parents) {
1722 transwarp::detail::call<TaskType, void>(task_id, task, parents);
1723 promise_.set_value();
1724 }
1725
1726 std::promise<void> promise_;
1727 };
1728
1729 /// A callable to run a task given its parents
1730 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1731 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1732 public:
1733
1734 runner(std::size_t task_id,
1735 const std::weak_ptr<Task>& task,
1736 const typename transwarp::decay<Parents>::type& parents)
1737 : task_id_(task_id),
1738 task_(task),
1739 parents_(parents)
1740 {}
1741
1742 std::future<ResultType> future() {
1743 return this->promise_.get_future();
1744 }
1745
1746 void operator()() {
1747 if (const std::shared_ptr<Task> t = task_.lock()) {
1748 t->raise_event(transwarp::event_type::before_started);
1749 }
1750 try {
1751 this->call(task_id_, task_, parents_);
1752 } catch (const transwarp::task_canceled&) {
1753 this->promise_.set_exception(std::current_exception());
1754 if (const std::shared_ptr<Task> t = task_.lock()) {
1755 t->raise_event(transwarp::event_type::after_canceled);
1756 }
1757 } catch (...) {
1758 this->promise_.set_exception(std::current_exception());
1759 }
1760 if (const std::shared_ptr<Task> t = task_.lock()) {
1761 t->raise_event(transwarp::event_type::after_finished);
1762 }
1763 }
1764
1765 private:
1766 const std::size_t task_id_;
1767 const std::weak_ptr<Task> task_;
1768 const typename transwarp::decay<Parents>::type parents_;
1769 };
1770
1771
1772 /// A simple circular buffer (FIFO).
1773 /// ValueType must support default construction. The buffer lets you push
1774 /// new values onto the back and pop old values off the front.
1775 template<typename ValueType>
1776 class circular_buffer {
1777 public:
1778
1779 static_assert(std::is_default_constructible<ValueType>::value, "ValueType must be default constructible");
1780
1781 using value_type = ValueType;
1782
1783 /// Constructs a circular buffer with a given fixed capacity
1784 explicit
1785 circular_buffer(std::size_t capacity)
1786 : data_(capacity)
1787 {
1788 if (capacity < 1) {
1789 throw transwarp::invalid_parameter{"capacity"};
1790 }
1791 }
1792
1793 // delete copy/move semantics
1794 circular_buffer(const circular_buffer&) = delete;
1795 circular_buffer& operator=(const circular_buffer&) = delete;
1796 circular_buffer(circular_buffer&& other) = delete;
1797 circular_buffer& operator=(circular_buffer&&) = delete;
1798
1799 /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1800 /// of the buffer then the oldest value gets dropped (the one at the front).
1801 template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1802 void push(T&& value) {
1803 data_[end_] = std::forward<T>(value);
1804 increment();
1805 }
1806
1807 /// Returns the value at the front of the buffer (the oldest value).
1808 /// This is undefined if the buffer is empty
1809 const value_type& front() const {
1810 return data_[front_];
1811 }
1812
1813 /// Removes the value at the front of the buffer (the oldest value)
1814 void pop() {
1815 if (!empty()) {
1816 data_[front_] = ValueType{};
1817 decrement();
1818 }
1819 }
1820
1821 /// Returns the capacity of the buffer
1822 std::size_t capacity() const {
1823 return data_.size();
1824 }
1825
1826 /// Returns the number of populated values of the buffer. Its maximum value
1827 /// equals the capacity of the buffer
1828 std::size_t size() const {
1829 return size_;
1830 }
1831
1832 /// Returns whether the buffer is empty
1833 bool empty() const {
1834 return size_ == 0;
1835 }
1836
1837 /// Returns whether the buffer is full
1838 bool full() const {
1839 return size_ == data_.size();
1840 }
1841
1842 /// Swaps this buffer with the given buffer
1843 void swap(circular_buffer& buffer) {
1844 std::swap(end_, buffer.end_);
1845 std::swap(front_, buffer.front_);
1846 std::swap(size_, buffer.size_);
1847 std::swap(data_, buffer.data_);
1848 }
1849
1850 private:
1851
1852 void increment_or_wrap(std::size_t& value) const {
1853 if (value == data_.size() - 1) {
1854 value = 0;
1855 } else {
1856 ++value;
1857 }
1858 }
1859
1860 void increment() {
1861 increment_or_wrap(end_);
1862 if (full()) {
1863 increment_or_wrap(front_);
1864 } else {
1865 ++size_;
1866 }
1867 }
1868
1869 void decrement() {
1870 increment_or_wrap(front_);
1871 --size_;
1872 }
1873
1874 std::size_t end_{};
1875 std::size_t front_{};
1876 std::size_t size_{};
1877 std::vector<value_type> data_;
1878 };
1879
1880
1881 class spinlock {
1882 public:
1883
1884 void lock() noexcept {
1885 while (locked_.test_and_set(std::memory_order_acquire));
1886 }
1887
1888 void unlock() noexcept {
1889 locked_.clear(std::memory_order_release);
1890 }
1891
1892 private:
1893 std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1894 };
1895
1896
1897 } // detail
1898
1899
1900 /// A functor not doing nothing
1901 struct no_op_functor {
1902 void operator()() const noexcept {}
1903 };
1904
1905 /// An object to use in places where a no-op functor is required
1906 constexpr no_op_functor no_op{};
1907
1908
1909 /// Executor for sequential execution. Runs functors sequentially on the same thread
1910 class sequential : public transwarp::executor {
1911 public:
1912
1913 sequential() = default;
1914
1915 // delete copy/move semantics
1916 sequential(const sequential&) = delete;
1917 sequential& operator=(const sequential&) = delete;
1918 sequential(sequential&&) = delete;
1919 sequential& operator=(sequential&&) = delete;
1920
1921 /// Returns the name of the executor
1922 std::string name() const override {
1923 return "transwarp::sequential";
1924 }
1925
1926 /// Runs the functor on the current thread
1927 void execute(const std::function<void()>& functor, transwarp::itask&) override {
1928 functor();
1929 }
1930 };
1931
1932
1933 /// Executor for parallel execution. Uses a simple thread pool
1934 class parallel : public transwarp::executor {
1935 public:
1936
1937 explicit parallel(const std::size_t n_threads,
1938 std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
1939 : pool_{n_threads, std::move(on_thread_started)}
1940 {}
1941
1942 // delete copy/move semantics
1943 parallel(const parallel&) = delete;
1944 parallel& operator=(const parallel&) = delete;
1945 parallel(parallel&&) = delete;
1946 parallel& operator=(parallel&&) = delete;
1947
1948 /// Returns the name of the executor
1949 std::string name() const override {
1950 return "transwarp::parallel";
1951 }
1952
1953 /// Pushes the functor into the thread pool for asynchronous execution
1954 void execute(const std::function<void()>& functor, transwarp::itask&) override {
1955 pool_.push(functor);
1956 }
1957
1958 private:
1959 transwarp::detail::thread_pool pool_;
1960 };
1961
1962
1963 /// Detail namespace for internal functionality only
1964 namespace detail {
1965
1966 const transwarp::option_str nullopt_string;
1967 const transwarp::any_data any_empty;
1968
1969
1970 template<typename ResultType, bool is_void>
1971 struct make_future_functor;
1972
1973 template<typename ResultType>
1974 struct make_future_functor<ResultType, true> {
1975 template<typename Future, typename OtherFuture>
1976 void operator()(Future& future, const OtherFuture& other) const {
1977 other.get();
1978 future = transwarp::detail::make_ready_future();
1979 }
1980 };
1981
1982 template<typename ResultType>
1983 struct make_future_functor<ResultType, false> {
1984 template<typename Future, typename OtherFuture>
1985 void operator()(Future& future, const OtherFuture& other) const {
1986 future = transwarp::detail::make_future_with_value<ResultType>(other.get());
1987 }
1988 };
1989
1990
1991 /// Common task functionality shared across `task_impl` and `value_task`
1992 template<typename ResultType>
1993 class task_common : public transwarp::task<ResultType> {
1994 public:
1995 /// The result type of this task
1996 using result_type = ResultType;
1997
1998 /// The task's id
1999 std::size_t id() const noexcept override {
2000 return id_;
2001 }
2002
2003 /// The optional task name
2004 const transwarp::option_str& name() const noexcept override {
2005 #ifndef TRANSWARP_DISABLE_TASK_NAME
2006 return name_;
2007 #else
2008 return transwarp::detail::nullopt_string;
2009 #endif
2010 }
2011
2012 /// The task priority (defaults to 0)
2013 std::int64_t priority() const noexcept override {
2014 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2015 return priority_;
2016 #else
2017 return 0;
2018 #endif
2019 }
2020
2021 /// The custom task data (may not hold a value)
2022 const transwarp::any_data& custom_data() const noexcept override {
2023 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2024 return custom_data_;
2025 #else
2026 return transwarp::detail::any_empty;
2027 #endif
2028 }
2029
2030 /// Sets a task priority (defaults to 0). transwarp will not directly use this.
2031 /// This is only useful if something else is using the priority (e.g. a custom executor)
2032 void set_priority(std::int64_t priority) override {
2033 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2034 ensure_task_not_running();
2035 priority_ = priority;
2036 #else
2037 (void)priority;
2038 #endif
2039 }
2040
2041 /// Resets the task priority to 0
2042 void reset_priority() override {
2043 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2044 ensure_task_not_running();
2045 priority_ = 0;
2046 #endif
2047 }
2048
2049 /// Assigns custom data to this task. transwarp will not directly use this.
2050 /// This is only useful if something else is using this custom data (e.g. a custom executor)
2051 void set_custom_data(transwarp::any_data custom_data) override {
2052 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2053 ensure_task_not_running();
2054 if (!custom_data.has_value()) {
2055 throw transwarp::invalid_parameter{"custom data"};
2056 }
2057 custom_data_ = std::move(custom_data);
2058 raise_event(transwarp::event_type::after_custom_data_set);
2059 #else
2060 (void)custom_data;
2061 #endif
2062 }
2063
2064 /// Removes custom data from this task
2065 void remove_custom_data() override {
2066 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2067 ensure_task_not_running();
2068 custom_data_ = {};
2069 raise_event(transwarp::event_type::after_custom_data_set);
2070 #endif
2071 }
2072
2073 /// Returns the future associated to the underlying execution
2074 std::shared_future<result_type> future() const noexcept override {
2075 return future_;
2076 }
2077
2078 /// Adds a new listener for all event types
2079 void add_listener(std::shared_ptr<transwarp::listener> listener) override {
2080 ensure_task_not_running();
2081 check_listener(listener);
2082 ensure_listeners_object();
2083 for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2084 (*listeners_)[static_cast<transwarp::event_type>(i)].push_back(listener);
2085 }
2086 }
2087
2088 /// Adds a new listener for the given event type only
2089 void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2090 ensure_task_not_running();
2091 check_listener(listener);
2092 ensure_listeners_object();
2093 (*listeners_)[event].push_back(std::move(listener));
2094 }
2095
2096 /// Removes the listener for all event types
2097 void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
2098 ensure_task_not_running();
2099 check_listener(listener);
2100 if (!listeners_) {
2101 return;
2102 }
2103 for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2104 auto listeners_pair = listeners_->find(static_cast<transwarp::event_type>(i));
2105 if (listeners_pair != listeners_->end()) {
2106 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2107 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
2108 }
2109 }
2110 }
2111
2112 /// Removes the listener for the given event type only
2113 void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2114 ensure_task_not_running();
2115 check_listener(listener);
2116 if (!listeners_) {
2117 return;
2118 }
2119 auto listeners_pair = listeners_->find(event);
2120 if (listeners_pair != listeners_->end()) {
2121 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2122 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
2123 }
2124 }
2125
2126 /// Removes all listeners
2127 void remove_listeners() override {
2128 ensure_task_not_running();
2129 if (!listeners_) {
2130 return;
2131 }
2132 listeners_->clear();
2133 }
2134
2135 /// Removes all listeners for the given event type
2136 void remove_listeners(transwarp::event_type event) override {
2137 ensure_task_not_running();
2138 if (!listeners_) {
2139 return;
2140 }
2141 auto listeners_pair = listeners_->find(event);
2142 if (listeners_pair != listeners_->end()) {
2143 listeners_pair->second.clear();
2144 }
2145 }
2146
2147 protected:
2148
2149 using listeners_t = std::map<transwarp::event_type, std::vector<std::shared_ptr<transwarp::listener>>>;
2150 using tasks_t = std::vector<transwarp::itask*>;
2151
2152 /// Checks if the task is currently running and throws transwarp::control_error if it is
2153 void ensure_task_not_running() const {
2154 if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
2155 throw transwarp::control_error{"task currently running: " + transwarp::to_string(*this, " ")};
2156 }
2157 }
2158
2159 /// Raises the given event to all listeners
2160 void raise_event(transwarp::event_type event) {
2161 if (!listeners_) {
2162 return;
2163 }
2164 auto listeners_pair = listeners_->find(event);
2165 if (listeners_pair != listeners_->end()) {
2166 for (const std::shared_ptr<transwarp::listener>& listener : listeners_pair->second) {
2167 listener->handle_event(event, *this);
2168 }
2169 }
2170 }
2171
2172 /// Check for non-null listener pointer
2173 void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
2174 if (!listener) {
2175 throw transwarp::invalid_parameter{"listener pointer"};
2176 }
2177 }
2178
2179 void ensure_listeners_object() {
2180 if (!listeners_) {
2181 listeners_.reset(new listeners_t);
2182 }
2183 }
2184
2185 /// Assigns the given id
2186 void set_id(std::size_t id) noexcept override {
2187 id_ = id;
2188 }
2189
2190 /// Assigns the given name
2191 void set_name(transwarp::option_str name) noexcept override {
2192 #ifndef TRANSWARP_DISABLE_TASK_NAME
2193 name_ = std::move(name);
2194 #else
2195 (void)name;
2196 #endif
2197 }
2198
2199 void copy_from(const task_common& task) {
2200 id_ = task.id_;
2201 #ifndef TRANSWARP_DISABLE_TASK_NAME
2202 name_ = task.name_;
2203 #endif
2204 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2205 priority_ = task.priority_;
2206 #endif
2207 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2208 custom_data_ = task.custom_data_;
2209 #endif
2210 if (task.has_result()) {
2211 try {
2212 make_future_functor<result_type, std::is_void<result_type>::value>{}(future_, task.future_);
2213 } catch (...) {
2214 future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
2215 }
2216 }
2217 visited_ = task.visited_;
2218 if (task.listeners_) {
2219 listeners_.reset(new listeners_t(*task.listeners_));
2220 }
2221 }
2222
2223 std::size_t id_ = 0;
2224 #ifndef TRANSWARP_DISABLE_TASK_NAME
2225 transwarp::option_str name_;
2226 #endif
2227 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2228 std::int64_t priority_ = 0;
2229 #endif
2230 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2231 transwarp::any_data custom_data_;
2232 #endif
2233 std::shared_future<result_type> future_;
2234 bool visited_ = false;
2235 std::unique_ptr<listeners_t> listeners_;
2236 std::unique_ptr<tasks_t> tasks_;
2237 };
2238
2239
2240 /// The base task class that contains the functionality that can be used
2241 /// with all result types (void and non-void).
2242 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2243 class task_impl_base : public transwarp::detail::task_common<ResultType> {
2244 public:
2245 /// The task type
2246 using task_type = TaskType;
2247
2248 /// The result type of this task
2249 using result_type = ResultType;
2250
2251 /// Can be called to explicitly finalize this task making this task
2252 /// the terminal task in the graph. This is also done implicitly when
2253 /// calling, e.g., any of the *_all methods. It should normally not be
2254 /// necessary to call this method directly
2255 void finalize() override {
2256 if (!this->tasks_) {
2257 this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t);
2258 visit(transwarp::detail::final_visitor{*this->tasks_});
2259 unvisit();
2260 auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
2261 const std::size_t l_level = l->level();
2262 const std::size_t l_id = l->id();
2263 const std::size_t r_level = r->level();
2264 const std::size_t r_id = r->id();
2265 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
2266 };
2267 std::sort(this->tasks_->begin(), this->tasks_->end(), compare);
2268 }
2269 }
2270
2271 /// The task's level
2272 std::size_t level() const noexcept override {
2273 return level_;
2274 }
2275
2276 /// The task's type
2277 transwarp::task_type type() const noexcept override {
2278 return type_;
2279 }
2280
2281 /// The task's executor (may be null)
2282 std::shared_ptr<transwarp::executor> executor() const noexcept override {
2283 return executor_;
2284 }
2285
2286 /// Returns whether the associated task is canceled
2287 bool canceled() const noexcept override {
2288 return canceled_.load();
2289 }
2290
2291 /// Returns the average idletime in microseconds (-1 if never set)
2292 std::int64_t avg_idletime_us() const noexcept override {
2293 #ifndef TRANSWARP_DISABLE_TASK_TIME
2294 return avg_idletime_us_.load();
2295 #else
2296 return -1;
2297 #endif
2298 }
2299
2300 /// Returns the average waittime in microseconds (-1 if never set)
2301 std::int64_t avg_waittime_us() const noexcept override {
2302 #ifndef TRANSWARP_DISABLE_TASK_TIME
2303 return avg_waittime_us_.load();
2304 #else
2305 return -1;
2306 #endif
2307 }
2308
2309 /// Returns the average runtime in microseconds (-1 if never set)
2310 std::int64_t avg_runtime_us() const noexcept override {
2311 #ifndef TRANSWARP_DISABLE_TASK_TIME
2312 return avg_runtime_us_.load();
2313 #else
2314 return -1;
2315 #endif
2316 }
2317
2318 /// Assigns an executor to this task which takes precedence over
2319 /// the executor provided in schedule() or schedule_all()
2320 void set_executor(std::shared_ptr<transwarp::executor> executor) override {
2321 this->ensure_task_not_running();
2322 if (!executor) {
2323 throw transwarp::invalid_parameter{"executor pointer"};
2324 }
2325 executor_ = std::move(executor);
2326 }
2327
2328 /// Assigns an executor to all tasks which takes precedence over
2329 /// the executor provided in schedule() or schedule_all()
2330 void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
2331 this->ensure_task_not_running();
2332 transwarp::detail::set_executor_visitor visitor{std::move(executor)};
2333 visit_all(visitor);
2334 }
2335
2336 /// Removes the executor from this task
2337 void remove_executor() override {
2338 this->ensure_task_not_running();
2339 executor_.reset();
2340 }
2341
2342 /// Removes the executor from all tasks
2343 void remove_executor_all() override {
2344 this->ensure_task_not_running();
2345 transwarp::detail::remove_executor_visitor visitor;
2346 visit_all(visitor);
2347 }
2348
2349 /// Schedules this task for execution on the caller thread.
2350 /// The task-specific executor gets precedence if it exists.
2351 /// This overload will reset the underlying future.
2352 void schedule() override {
2353 this->ensure_task_not_running();
2354 this->schedule_impl(true);
2355 }
2356
2357 /// Schedules this task for execution on the caller thread.
2358 /// The task-specific executor gets precedence if it exists.
2359 /// reset denotes whether schedule should reset the underlying
2360 /// future and schedule even if the future is already valid.
2361 void schedule(bool reset) override {
2362 this->ensure_task_not_running();
2363 this->schedule_impl(reset);
2364 }
2365
2366 /// Schedules this task for execution using the provided executor.
2367 /// The task-specific executor gets precedence if it exists.
2368 /// This overload will reset the underlying future.
2369 void schedule(transwarp::executor& executor) override {
2370 this->ensure_task_not_running();
2371 this->schedule_impl(true, &executor);
2372 }
2373
2374 /// Schedules this task for execution using the provided executor.
2375 /// The task-specific executor gets precedence if it exists.
2376 /// reset denotes whether schedule should reset the underlying
2377 /// future and schedule even if the future is already valid.
2378 void schedule(transwarp::executor& executor, bool reset) override {
2379 this->ensure_task_not_running();
2380 this->schedule_impl(reset, &executor);
2381 }
2382
2383 /// Schedules all tasks in the graph for execution on the caller thread.
2384 /// The task-specific executors get precedence if they exist.
2385 /// This overload will reset the underlying futures.
2386 void schedule_all() override {
2387 this->ensure_task_not_running();
2388 schedule_all_impl(true);
2389 }
2390
2391 /// Schedules all tasks in the graph for execution using the provided executor.
2392 /// The task-specific executors get precedence if they exist.
2393 /// This overload will reset the underlying futures.
2394 void schedule_all(transwarp::executor& executor) override {
2395 this->ensure_task_not_running();
2396 schedule_all_impl(true, &executor);
2397 }
2398
2399 /// Schedules all tasks in the graph for execution on the caller thread.
2400 /// The task-specific executors get precedence if they exist.
2401 /// reset_all denotes whether schedule_all should reset the underlying
2402 /// futures and schedule even if the futures are already present.
2403 void schedule_all(bool reset_all) override {
2404 this->ensure_task_not_running();
2405 schedule_all_impl(reset_all);
2406 }
2407
2408 /// Schedules all tasks in the graph for execution using the provided executor.
2409 /// The task-specific executors get precedence if they exist.
2410 /// reset_all denotes whether schedule_all should reset the underlying
2411 /// futures and schedule even if the futures are already present.
2412 void schedule_all(transwarp::executor& executor, bool reset_all) override {
2413 this->ensure_task_not_running();
2414 schedule_all_impl(reset_all, &executor);
2415 }
2416
2417 /// Assigns an exception to this task. Scheduling will have no effect after an exception
2418 /// has been set. Calling reset() will remove the exception and re-enable scheduling.
2419 void set_exception(std::exception_ptr exception) override {
2420 this->ensure_task_not_running();
2421 this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2422 schedule_mode_ = false;
2423 this->raise_event(transwarp::event_type::after_future_changed);
2424 }
2425
2426 /// Returns whether the task was scheduled and not reset afterwards.
2427 /// This means that the underlying future is valid
2428 bool was_scheduled() const noexcept override {
2429 return this->future_.valid();
2430 }
2431
2432 /// Waits for the task to complete. Should only be called if was_scheduled()
2433 /// is true, throws transwarp::control_error otherwise
2434 void wait() const override {
2435 ensure_task_was_scheduled();
2436 this->future_.wait();
2437 }
2438
2439 /// Returns whether the task has finished processing. Should only be called
2440 /// if was_scheduled() is true, throws transwarp::control_error otherwise
2441 bool is_ready() const override {
2442 ensure_task_was_scheduled();
2443 return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2444 }
2445
2446 /// Returns whether this task contains a result
2447 bool has_result() const noexcept override {
2448 return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2449 }
2450
2451 /// Resets this task
2452 void reset() override {
2453 this->ensure_task_not_running();
2454 this->future_ = std::shared_future<result_type>{};
2455 cancel(false);
2456 schedule_mode_ = true;
2457 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2458 refcount_ = childcount_;
2459 #endif
2460 this->raise_event(transwarp::event_type::after_future_changed);
2461 }
2462
2463 /// Resets all tasks in the graph
2464 void reset_all() override {
2465 this->ensure_task_not_running();
2466 transwarp::detail::reset_visitor visitor;
2467 visit_all(visitor);
2468 }
2469
2470 /// If enabled then this task is canceled which will
2471 /// throw transwarp::task_canceled when retrieving the task result.
2472 /// Passing false is equivalent to resume.
2473 void cancel(bool enabled) noexcept override {
2474 canceled_ = enabled;
2475 }
2476
2477 /// If enabled then all pending tasks in the graph are canceled which will
2478 /// throw transwarp::task_canceled when retrieving the task result.
2479 /// Passing false is equivalent to resume.
2480 void cancel_all(bool enabled) noexcept override {
2481 transwarp::detail::cancel_visitor visitor{enabled};
2482 visit_all(visitor);
2483 }
2484
2485 /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2486 /// This is only useful if something else is using the priority (e.g. a custom executor)
2487 void set_priority_all(std::int64_t priority) override {
2488 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2489 this->ensure_task_not_running();
2490 transwarp::detail::set_priority_visitor visitor{priority};
2491 visit_all(visitor);
2492 #else
2493 (void)priority;
2494 #endif
2495 }
2496
2497 /// Resets the priority of all tasks to 0
2498 void reset_priority_all() override {
2499 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2500 this->ensure_task_not_running();
2501 transwarp::detail::reset_priority_visitor visitor;
2502 visit_all(visitor);
2503 #endif
2504 }
2505
2506 /// Assigns custom data to all tasks. transwarp will not directly use this.
2507 /// This is only useful if something else is using this custom data (e.g. a custom executor)
2508 void set_custom_data_all(transwarp::any_data custom_data) override {
2509 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2510 this->ensure_task_not_running();
2511 transwarp::detail::set_custom_data_visitor visitor{std::move(custom_data)};
2512 visit_all(visitor);
2513 #else
2514 (void)custom_data;
2515 #endif
2516 }
2517
2518 /// Removes custom data from all tasks
2519 void remove_custom_data_all() override {
2520 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2521 this->ensure_task_not_running();
2522 transwarp::detail::remove_custom_data_visitor visitor;
2523 visit_all(visitor);
2524 #endif
2525 }
2526
2527 /// Adds a new listener for all event types and for all parents
2528 void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
2529 this->ensure_task_not_running();
2530 transwarp::detail::add_listener_visitor visitor{std::move(listener)};
2531 visit_all(visitor);
2532 }
2533
2534 /// Adds a new listener for the given event type only and for all parents
2535 void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2536 this->ensure_task_not_running();
2537 transwarp::detail::add_listener_per_event_visitor visitor{event, std::move(listener)};
2538 visit_all(visitor);
2539 }
2540
2541 /// Removes the listener for all event types and for all parents
2542 void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
2543 this->ensure_task_not_running();
2544 transwarp::detail::remove_listener_visitor visitor{std::move(listener)};
2545 visit_all(visitor);
2546 }
2547
2548 /// Removes the listener for the given event type only and for all parents
2549 void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2550 this->ensure_task_not_running();
2551 transwarp::detail::remove_listener_per_event_visitor visitor{event, std::move(listener)};
2552 visit_all(visitor);
2553 }
2554
2555 /// Removes all listeners and for all parents
2556 void remove_listeners_all() override {
2557 this->ensure_task_not_running();
2558 transwarp::detail::remove_listeners_visitor visitor;
2559 visit_all(visitor);
2560 }
2561
2562 /// Removes all listeners for the given event type and for all parents
2563 void remove_listeners_all(transwarp::event_type event) override {
2564 this->ensure_task_not_running();
2565 transwarp::detail::remove_listeners_per_event_visitor visitor{event};
2566 visit_all(visitor);
2567 }
2568
2569 /// Returns the task's parents (may be empty)
2570 std::vector<transwarp::itask*> parents() const override {
2571 return transwarp::detail::parents<ParentResults...>::tasks(parents_);
2572 }
2573
2574 /// Returns all tasks in the graph in breadth order
2575 const std::vector<transwarp::itask*>& tasks() override {
2576 finalize();
2577 return *this->tasks_;
2578 }
2579
2580 /// Returns all edges in the graph. This is mainly for visualizing
2581 /// the tasks and their interdependencies. Pass the result into transwarp::to_string
2582 /// to retrieve a dot-style graph representation for easy viewing.
2583 std::vector<transwarp::edge> edges() override {
2584 std::vector<transwarp::edge> edges;
2585 transwarp::detail::edges_visitor visitor{edges};
2586 visit_all(visitor);
2587 return edges;
2588 }
2589
2590 protected:
2591
2592 task_impl_base() {}
2593
2594 template<typename F>
2595 task_impl_base(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2596 : functor_(new Functor(std::forward<F>(functor))),
2597 parents_(std::move(parents)...)
2598 {
2599 init();
2600 }
2601
2602 template<typename F, typename P>
2603 task_impl_base(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2604 : functor_(new Functor(std::forward<F>(functor))),
2605 parents_(std::move(parents))
2606 {
2607 init();
2608 if (parents_.empty()) {
2609 set_type(transwarp::task_type::root);
2610 }
2611 }
2612
2613 void init() {
2614 set_type(task_type::value);
2615 transwarp::detail::assign_task_if(*functor_, *this);
2616 transwarp::detail::call_with_each(transwarp::detail::parent_visitor{*this}, parents_);
2617 }
2618
2619 template<typename R, typename Y, typename T, typename P>
2620 friend class transwarp::detail::runner;
2621
2622 template<typename R, typename T, typename... A>
2623 friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
2624
2625 /// Assigns the given level
2626 void set_level(std::size_t level) noexcept override {
2627 level_ = level;
2628 }
2629
2630 /// Assigns the given type
2631 void set_type(transwarp::task_type type) noexcept override {
2632 type_ = type;
2633 }
2634
2635 /// Assigns the given idletime
2636 void set_avg_idletime_us(std::int64_t idletime) noexcept override {
2637 #ifndef TRANSWARP_DISABLE_TASK_TIME
2638 avg_idletime_us_ = idletime;
2639 #else
2640 (void)idletime;
2641 #endif
2642 }
2643
2644 /// Assigns the given waittime
2645 void set_avg_waittime_us(std::int64_t waittime) noexcept override {
2646 #ifndef TRANSWARP_DISABLE_TASK_TIME
2647 avg_waittime_us_ = waittime;
2648 #else
2649 (void)waittime;
2650 #endif
2651 }
2652
2653 /// Assigns the given runtime
2654 void set_avg_runtime_us(std::int64_t runtime) noexcept override {
2655 #ifndef TRANSWARP_DISABLE_TASK_TIME
2656 avg_runtime_us_ = runtime;
2657 #else
2658 (void)runtime;
2659 #endif
2660 }
2661
2662 /// Checks if the task was scheduled and throws transwarp::control_error if it's not
2663 void ensure_task_was_scheduled() const {
2664 if (!this->future_.valid()) {
2665 throw transwarp::control_error{"task was not scheduled: " + transwarp::to_string(*this, " ")};
2666 }
2667 }
2668
2669 /// Schedules this task for execution using the provided executor.
2670 /// The task-specific executor gets precedence if it exists.
2671 /// Runs the task on the same thread as the caller if neither the global
2672 /// nor the task-specific executor is found.
2673 void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
2674 if (schedule_mode_ && (reset || !this->future_.valid())) {
2675 if (reset) {
2676 cancel(false);
2677 }
2678 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2679 refcount_ = childcount_;
2680 #endif
2681 std::weak_ptr<task_impl_base> self = std::static_pointer_cast<task_impl_base>(this->shared_from_this());
2682 using runner_t = transwarp::detail::runner<result_type, task_type, task_impl_base, decltype(parents_)>;
2683 std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(new runner_t(this->id(), self, parents_));
2684 this->raise_event(transwarp::event_type::before_scheduled);
2685 this->future_ = runner->future();
2686 this->raise_event(transwarp::event_type::after_future_changed);
2687 if (this->executor_) {
2688 this->executor_->execute([runner]{ (*runner)(); }, *this);
2689 } else if (executor) {
2690 executor->execute([runner]{ (*runner)(); }, *this);
2691 } else {
2692 (*runner)();
2693 }
2694 }
2695 }
2696
2697 /// Schedules all tasks in the graph for execution using the provided executor.
2698 /// The task-specific executors get precedence if they exist.
2699 /// Runs tasks on the same thread as the caller if neither the global
2700 /// nor a task-specific executor is found.
2701 void schedule_all_impl(bool reset_all, transwarp::executor* executor=nullptr) {
2702 transwarp::detail::schedule_visitor visitor{reset_all, executor};
2703 visit_all(visitor);
2704 }
2705
2706 /// Visits each task in a depth-first traversal
2707 void visit(const std::function<void(transwarp::itask&)>& visitor) override {
2708 if (!this->visited_) {
2709 transwarp::detail::call_with_each(transwarp::detail::visit_visitor{visitor}, parents_);
2710 visitor(*this);
2711 this->visited_ = true;
2712 }
2713 }
2714
2715 /// Traverses through each task and marks them as not visited.
2716 void unvisit() noexcept override {
2717 if (this->visited_) {
2718 this->visited_ = false;
2719 transwarp::detail::call_with_each(transwarp::detail::unvisit_visitor{}, parents_);
2720 }
2721 }
2722
2723 /// Visits all tasks
2724 template<typename Visitor>
2725 void visit_all(Visitor& visitor) {
2726 finalize();
2727 for (transwarp::itask* t : *this->tasks_) {
2728 visitor(*t);
2729 }
2730 }
2731
2732 void increment_childcount() noexcept override {
2733 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2734 ++childcount_;
2735 #endif
2736 }
2737
2738 void decrement_refcount() override {
2739 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2740 if (--refcount_ == 0) {
2741 this->raise_event(transwarp::event_type::after_satisfied);
2742 }
2743 #endif
2744 }
2745
2746 void reset_future() override {
2747 this->future_ = std::shared_future<result_type>{};
2748 this->raise_event(transwarp::event_type::after_future_changed);
2749 }
2750
2751 std::size_t level_ = 0;
2752 transwarp::task_type type_ = transwarp::task_type::root;
2753 std::shared_ptr<transwarp::executor> executor_;
2754 std::atomic<bool> canceled_{false};
2755 bool schedule_mode_ = true;
2756 #ifndef TRANSWARP_DISABLE_TASK_TIME
2757 std::atomic<std::int64_t> avg_idletime_us_{-1};
2758 std::atomic<std::int64_t> avg_waittime_us_{-1};
2759 std::atomic<std::int64_t> avg_runtime_us_{-1};
2760 #endif
2761 std::unique_ptr<Functor> functor_;
2762 typename transwarp::detail::parents<ParentResults...>::type parents_;
2763 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2764 std::size_t childcount_ = 0;
2765 std::atomic<std::size_t> refcount_{0};
2766 #endif
2767 };
2768
2769
2770 /// A task proxy
2771 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2772 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2773 public:
2774 /// The task type
2775 using task_type = TaskType;
2776
2777 /// The result type of this task
2778 using result_type = ResultType;
2779
2780 /// Assigns a value to this task. Scheduling will have no effect after a value
2781 /// has been set. Calling reset() will remove the value and re-enable scheduling.
2782 void set_value(const typename transwarp::decay<result_type>::type& value) override {
2783 this->ensure_task_not_running();
2784 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2785 this->schedule_mode_ = false;
2786 this->raise_event(transwarp::event_type::after_future_changed);
2787 }
2788
2789 /// Assigns a value to this task. Scheduling will have no effect after a value
2790 /// has been set. Calling reset() will remove the value and re-enable scheduling.
2791 void set_value(typename transwarp::decay<result_type>::type&& value) override {
2792 this->ensure_task_not_running();
2793 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2794 this->schedule_mode_ = false;
2795 this->raise_event(transwarp::event_type::after_future_changed);
2796 }
2797
2798 /// Returns the result of this task. Throws any exceptions that the underlying
2799 /// functor throws. Should only be called if was_scheduled() is true,
2800 /// throws transwarp::control_error otherwise
2801 typename transwarp::result<result_type>::type get() const override {
2802 this->ensure_task_was_scheduled();
2803 return this->future_.get();
2804 }
2805
2806 protected:
2807
2808 task_impl_proxy() = default;
2809
2810 template<typename F>
2811 task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2812 : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2813 {}
2814
2815 template<typename F, typename P>
2816 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2817 : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2818 {}
2819
2820 };
2821
2822 /// A task proxy for reference result type.
2823 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2824 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2825 public:
2826 /// The task type
2827 using task_type = TaskType;
2828
2829 /// The result type of this task
2830 using result_type = ResultType&;
2831
2832 /// Assigns a value to this task. Scheduling will have no effect after a value
2833 /// has been set. Calling reset() will remove the value and re-enable scheduling.
2834 void set_value(typename transwarp::decay<result_type>::type& value) override {
2835 this->ensure_task_not_running();
2836 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2837 this->schedule_mode_ = false;
2838 this->raise_event(transwarp::event_type::after_future_changed);
2839 }
2840
2841 /// Returns the result of this task. Throws any exceptions that the underlying
2842 /// functor throws. Should only be called if was_scheduled() is true,
2843 /// throws transwarp::control_error otherwise
2844 typename transwarp::result<result_type>::type get() const override {
2845 this->ensure_task_was_scheduled();
2846 return this->future_.get();
2847 }
2848
2849 protected:
2850
2851 task_impl_proxy() = default;
2852
2853 template<typename F>
2854 task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2855 : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2856 {}
2857
2858 template<typename F, typename P>
2859 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2860 : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2861 {}
2862
2863 };
2864
2865 /// A task proxy for void result type.
2866 template<typename TaskType, typename Functor, typename... ParentResults>
2867 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2868 public:
2869 /// The task type
2870 using task_type = TaskType;
2871
2872 /// The result type of this task
2873 using result_type = void;
2874
2875 /// Assigns a value to this task. Scheduling will have no effect after a call
2876 /// to this. Calling reset() will reset this and re-enable scheduling.
2877 void set_value() override {
2878 this->ensure_task_not_running();
2879 this->future_ = transwarp::detail::make_ready_future();
2880 this->schedule_mode_ = false;
2881 this->raise_event(transwarp::event_type::after_future_changed);
2882 }
2883
2884 /// Blocks until the task finishes. Throws any exceptions that the underlying
2885 /// functor throws. Should only be called if was_scheduled() is true,
2886 /// throws transwarp::control_error otherwise
2887 void get() const override {
2888 this->ensure_task_was_scheduled();
2889 this->future_.get();
2890 }
2891
2892 protected:
2893
2894 task_impl_proxy() = default;
2895
2896 template<typename F>
2897 task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2898 : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2899 {}
2900
2901 template<typename F, typename P>
2902 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2903 : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2904 {}
2905
2906 };
2907
2908 } // detail
2909
2910
2911 /// A task representing a piece of work given by functor and parent tasks.
2912 /// By connecting tasks a directed acyclic graph is built.
2913 /// Tasks should be created using the make_task factory functions.
2914 template<typename TaskType, typename Functor, typename... ParentResults>
2915 class task_impl : public transwarp::detail::task_impl_proxy<typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type, TaskType, Functor, ParentResults...> {
2916 public:
2917 /// The task type
2918 using task_type = TaskType;
2919
2920 /// The result type of this task
2921 using result_type = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
2922
2923 /// A task is defined by functor and parent tasks.
2924 /// Note: Don't use this constructor directly, use transwarp::make_task
2925 template<typename F>
2926 task_impl(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2927 : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2928 {}
2929
2930 /// A task is defined by functor and parent tasks.
2931 /// Note: Don't use this constructor directly, use transwarp::make_task
2932 template<typename F, typename P>
2933 task_impl(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2934 : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2935 {}
2936
2937 // delete copy/move semantics
2938 task_impl(const task_impl&) = delete;
2939 task_impl& operator=(const task_impl&) = delete;
2940 task_impl(task_impl&&) = delete;
2941 task_impl& operator=(task_impl&&) = delete;
2942
2943 /// Gives this task a name and returns a ptr to itself
2944 std::shared_ptr<task_impl> named(std::string name) {
2945 #ifndef TRANSWARP_DISABLE_TASK_NAME
2946 #ifdef TRANSWARP_CPP11
2947 this->set_name(transwarp::option_str{std::move(name)});
2948 #else
2949 this->set_name(std::make_optional(std::move(name)));
2950 #endif
2951 #else
2952 (void)name;
2953 #endif
2954 return std::static_pointer_cast<task_impl>(this->shared_from_this());
2955 }
2956
2957 /// Creates a continuation to this task
2958 template<typename TaskType_, typename Functor_>
2959 auto then(TaskType_, Functor_&& functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
2960 using task_t = transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>;
2961 return std::shared_ptr<task_t>(new task_t(std::forward<Functor_>(functor), std::static_pointer_cast<task_impl>(this->shared_from_this())));
2962 }
2963
2964 /// Clones this task and casts the result to a ptr to task_impl
2965 std::shared_ptr<task_impl> clone_cast() const {
2966 return std::static_pointer_cast<task_impl>(this->clone());
2967 }
2968
2969 private:
2970
2971 task_impl() = default;
2972
2973 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const override {
2974 auto t = std::shared_ptr<task_impl>{new task_impl};
2975 t->copy_from(*this);
2976 t->level_ = this->level_;
2977 t->type_ = this->type_;
2978 t->executor_ = this->executor_;
2979 t->canceled_ = this->canceled_.load();
2980 t->schedule_mode_ = this->schedule_mode_;
2981 #ifndef TRANSWARP_DISABLE_TASK_TIME
2982 t->avg_idletime_us_ = this->avg_idletime_us_.load();
2983 t->avg_waittime_us_ = this->avg_waittime_us_.load();
2984 t->avg_runtime_us_ = this->avg_runtime_us_.load();
2985 #endif
2986 t->functor_.reset(new Functor(*this->functor_));
2987 t->parents_ = transwarp::detail::parents<ParentResults...>::clone(task_cache, this->parents_);
2988 t->executor_ = this->executor_;
2989 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2990 t->childcount_ = this->childcount_;
2991 #endif
2992 return t;
2993 }
2994
2995 };
2996
2997
2998 /// A value task that stores a single value and doesn't require scheduling.
2999 /// Value tasks should be created using the make_value_task factory functions.
3000 template<typename ResultType>
3001 class value_task : public transwarp::detail::task_common<ResultType> {
3002 public:
3003 /// The task type
3004 using task_type = transwarp::root_type;
3005
3006 /// The result type of this task
3007 using result_type = ResultType;
3008
3009 /// A value task is defined by a given value.
3010 /// Note: Don't use this constructor directly, use transwarp::make_value_task
3011 template<typename T>
3012 value_task(T&& value)
3013 {
3014 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value));
3015 this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t{this});
3016 }
3017
3018 // delete copy/move semantics
3019 value_task(const value_task&) = delete;
3020 value_task& operator=(const value_task&) = delete;
3021 value_task(value_task&&) = delete;
3022 value_task& operator=(value_task&&) = delete;
3023
3024 /// Gives this task a name and returns a ptr to itself
3025 std::shared_ptr<value_task> named(std::string name) {
3026 #ifndef TRANSWARP_DISABLE_TASK_NAME
3027 #ifdef TRANSWARP_CPP11
3028 this->set_name(transwarp::option_str{std::move(name)});
3029 #else
3030 this->set_name(std::make_optional(std::move(name)));
3031 #endif
3032 #else
3033 (void)name;
3034 #endif
3035 return std::static_pointer_cast<value_task>(this->shared_from_this());
3036 }
3037
3038 /// Creates a continuation to this task
3039 template<typename TaskType_, typename Functor_>
3040 auto then(TaskType_, Functor_&& functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
3041 using task_t = transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>;
3042 return std::shared_ptr<task_t>(new task_t(std::forward<Functor_>(functor), std::static_pointer_cast<value_task>(this->shared_from_this())));
3043 }
3044
3045 /// Clones this task and casts the result to a ptr to value_task
3046 std::shared_ptr<value_task> clone_cast() const {
3047 return std::static_pointer_cast<value_task>(this->clone());
3048 }
3049
3050 /// Nothing to be done to finalize a value task
3051 void finalize() override {}
3052
3053 /// The task's level
3054 std::size_t level() const noexcept override {
3055 return 0;
3056 }
3057
3058 /// The task's type
3059 transwarp::task_type type() const noexcept override {
3060 return transwarp::task_type::root;
3061 }
3062
3063 /// Value tasks don't have executors as they don't run
3064 std::shared_ptr<transwarp::executor> executor() const noexcept override {
3065 return nullptr;
3066 }
3067
3068 /// Value tasks cannot be canceled
3069 bool canceled() const noexcept override {
3070 return false;
3071 }
3072
3073 /// Returns -1 as value tasks don't run
3074 std::int64_t avg_idletime_us() const noexcept override {
3075 return -1;
3076 }
3077
3078 /// Returns -1 as value tasks don't run
3079 std::int64_t avg_waittime_us() const noexcept override {
3080 return -1;
3081 }
3082
3083 /// Returns -1 as value tasks don't run
3084 std::int64_t avg_runtime_us() const noexcept override {
3085 return -1;
3086 }
3087
3088 /// No-op because a value task never runs
3089 void set_executor(std::shared_ptr<transwarp::executor>) override {}
3090
3091 /// No-op because a value task never runs and doesn't have parents
3092 void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
3093
3094 /// No-op because a value task never runs
3095 void remove_executor() override {}
3096
3097 /// No-op because a value task never runs and doesn't have parents
3098 void remove_executor_all() override {}
3099
3100 /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
3101 /// This is only useful if something else is using the priority
3102 void set_priority_all(std::int64_t priority) override {
3103 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3104 this->set_priority(priority);
3105 #else
3106 (void)priority;
3107 #endif
3108 }
3109
3110 /// Resets the priority of all tasks to 0
3111 void reset_priority_all() override {
3112 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3113 this->reset_priority();
3114 #endif
3115 }
3116
3117 /// Assigns custom data to all tasks. transwarp will not directly use this.
3118 /// This is only useful if something else is using this custom data
3119 void set_custom_data_all(transwarp::any_data custom_data) override {
3120 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3121 this->set_custom_data(std::move(custom_data));
3122 #else
3123 (void)custom_data;
3124 #endif
3125 }
3126
3127 /// Removes custom data from all tasks
3128 void remove_custom_data_all() override {
3129 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3130 this->remove_custom_data();
3131 #endif
3132 }
3133
3134 /// No-op because a value task never runs
3135 void schedule() override {}
3136
3137 /// No-op because a value task never runs
3138 void schedule(transwarp::executor&) override {}
3139
3140 /// No-op because a value task never runs
3141 void schedule(bool) override {}
3142
3143 /// No-op because a value task never runs
3144 void schedule(transwarp::executor&, bool) override {}
3145
3146 /// No-op because a value task never runs and doesn't have parents
3147 void schedule_all() override {}
3148
3149 /// No-op because a value task never runs and doesn't have parents
3150 void schedule_all(transwarp::executor&) override {}
3151
3152 /// No-op because a value task never runs and doesn't have parents
3153 void schedule_all(bool) override {}
3154
3155 /// No-op because a value task never runs and doesn't have parents
3156 void schedule_all(transwarp::executor&, bool) override {}
3157
3158 /// Assigns a value to this task
3159 void set_value(const typename transwarp::decay<result_type>::type& value) override {
3160 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
3161 this->raise_event(transwarp::event_type::after_future_changed);
3162 }
3163
3164 /// Assigns a value to this task
3165 void set_value(typename transwarp::decay<result_type>::type&& value) override {
3166 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
3167 this->raise_event(transwarp::event_type::after_future_changed);
3168 }
3169
3170 /// Assigns an exception to this task
3171 void set_exception(std::exception_ptr exception) override {
3172 this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
3173 this->raise_event(transwarp::event_type::after_future_changed);
3174 }
3175
3176 /// Returns the value of this task. Throws an exception if this task has an exception assigned to it
3177 typename transwarp::result<result_type>::type get() const override {
3178 return this->future_.get();
3179 }
3180
3181 /// Returns true because a value task is scheduled once on construction
3182 bool was_scheduled() const noexcept override {
3183 return true;
3184 }
3185
3186 /// No-op because a value task never runs
3187 void wait() const override {}
3188
3189 /// Returns true because a value task is always ready
3190 bool is_ready() const override {
3191 return true;
3192 }
3193
3194 /// Returns true because a value task always contains a result
3195 bool has_result() const noexcept override {
3196 return true;
3197 }
3198
3199 /// No-op because a value task never runs
3200 void reset() override {}
3201
3202 /// No-op because a value task never runs and doesn't have parents
3203 void reset_all() override {}
3204
3205 /// No-op because a value task never runs
3206 void cancel(bool) noexcept override {}
3207
3208 /// No-op because a value task never runs and doesn't have parents
3209 void cancel_all(bool) noexcept override {}
3210
3211 /// Adds a new listener for all event types and for all parents
3212 void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
3213 this->add_listener(listener);
3214 }
3215
3216 /// Adds a new listener for the given event type only and for all parents
3217 void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
3218 this->add_listener(event, listener);
3219 }
3220
3221 /// Removes the listener for all event types and for all parents
3222 void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
3223 this->remove_listener(listener);
3224 }
3225
3226 /// Removes the listener for the given event type only and for all parents
3227 void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
3228 this->remove_listener(event, listener);
3229 }
3230
3231 /// Removes all listeners and for all parents
3232 void remove_listeners_all() override {
3233 this->remove_listeners();
3234 }
3235
3236 /// Removes all listeners for the given event type and for all parents
3237 void remove_listeners_all(transwarp::event_type event) override {
3238 this->remove_listeners(event);
3239 }
3240
3241 /// Empty because a value task doesn't have parents
3242 std::vector<transwarp::itask*> parents() const override {
3243 return {};
3244 }
3245
3246 /// Returns all tasks in the graph in breadth order
3247 const std::vector<transwarp::itask*>& tasks() override {
3248 return *this->tasks_;
3249 }
3250
3251 /// Returns empty edges because a value task doesn't have parents
3252 std::vector<transwarp::edge> edges() override {
3253 return {};
3254 }
3255
3256 private:
3257
3258 value_task()
3259 {
3260 this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t{this});
3261 }
3262
3263 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&) const override {
3264 auto t = std::shared_ptr<value_task>(new value_task);
3265 t->copy_from(*this);
3266 return t;
3267 }
3268
3269 /// No-op as value tasks are always at level 0
3270 void set_level(std::size_t) noexcept override {}
3271
3272 /// No-op as value tasks are always root tasks
3273 void set_type(transwarp::task_type) noexcept override {}
3274
3275 /// No-op as value tasks don't run
3276 void set_avg_idletime_us(std::int64_t) noexcept override {}
3277
3278 /// No-op as value tasks don't run
3279 void set_avg_waittime_us(std::int64_t) noexcept override {}
3280
3281 /// No-op as value tasks don't run
3282 void set_avg_runtime_us(std::int64_t) noexcept override {}
3283
3284 /// No-op because a value task never runs
3285 void schedule_impl(bool, transwarp::executor*) override {}
3286
3287 /// Visits this task
3288 void visit(const std::function<void(transwarp::itask&)>& visitor) override {
3289 if (!this->visited_) {
3290 visitor(*this);
3291 this->visited_ = true;
3292 }
3293 }
3294
3295 /// Marks this task as not visited
3296 void unvisit() noexcept override {
3297 this->visited_ = false;
3298 }
3299
3300 void increment_childcount() noexcept override {}
3301
3302 void decrement_refcount() override {}
3303
3304 void reset_future() override {}
3305
3306 };
3307
3308
3309 /// A factory function to create a new task
3310 template<typename TaskType, typename Functor, typename... Parents>
3311 auto make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>> {
3312 using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
3313 return std::shared_ptr<task_t>(new task_t(std::forward<Functor>(functor), std::move(parents)...));
3314 }
3315
3316
3317 /// A factory function to create a new task with vector parents
3318 template<typename TaskType, typename Functor, typename ParentType>
3319 auto make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>> {
3320 using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
3321 return std::shared_ptr<task_t>(new task_t(std::forward<Functor>(functor), std::move(parents)));
3322 }
3323
3324
3325 /// A factory function to create a new value task
3326 template<typename Value>
3327 auto make_value_task(Value&& value) -> std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>> {
3328 using task_t = transwarp::value_task<typename transwarp::decay<Value>::type>;
3329 return std::shared_ptr<task_t>(new task_t(std::forward<Value>(value)));
3330 }
3331
3332
3333 /// A function similar to std::for_each but returning a transwarp task for
3334 /// deferred, possibly asynchronous execution. This function creates a graph
3335 /// with std::distance(first, last) root tasks
3336 template<typename InputIt, typename UnaryOperation>
3337 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3338 const auto distance = std::distance(first, last);
3339 if (distance <= 0) {
3340 throw transwarp::invalid_parameter{"first or last"};
3341 }
3342 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3343 tasks.reserve(static_cast<std::size_t>(distance));
3344 for (; first != last; ++first) {
3345 tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); }));
3346 }
3347 return transwarp::make_task(transwarp::wait, transwarp::no_op, tasks);
3348 }
3349
3350 /// A function similar to std::for_each but returning a transwarp task for
3351 /// deferred, possibly asynchronous execution. This function creates a graph
3352 /// with std::distance(first, last) root tasks.
3353 /// Overload for automatic scheduling by passing an executor.
3354 template<typename InputIt, typename UnaryOperation>
3355 auto for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3356 auto task = transwarp::for_each(first, last, unary_op);
3357 task->schedule_all(executor);
3358 return task;
3359 }
3360
3361
3362 /// A function similar to std::transform but returning a transwarp task for
3363 /// deferred, possibly asynchronous execution. This function creates a graph
3364 /// with std::distance(first1, last1) root tasks
3365 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3366 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3367 const auto distance = std::distance(first1, last1);
3368 if (distance <= 0) {
3369 throw transwarp::invalid_parameter{"first1 or last1"};
3370 }
3371 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3372 tasks.reserve(static_cast<std::size_t>(distance));
3373 for (; first1 != last1; ++first1, ++d_first) {
3374 tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); }));
3375 }
3376 return transwarp::make_task(transwarp::wait, transwarp::no_op, tasks);
3377 }
3378
3379 /// A function similar to std::transform but returning a transwarp task for
3380 /// deferred, possibly asynchronous execution. This function creates a graph
3381 /// with std::distance(first1, last1) root tasks.
3382 /// Overload for automatic scheduling by passing an executor.
3383 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3384 auto transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3385 auto task = transwarp::transform(first1, last1, d_first, unary_op);
3386 task->schedule_all(executor);
3387 return task;
3388 }
3389
3390
3391 /// A task pool that allows running multiple instances of the same task in parallel.
3392 template<typename ResultType>
3393 class task_pool {
3394 public:
3395
3396 /// Constructs a task pool
3397 task_pool(std::shared_ptr<transwarp::task<ResultType>> task,
3398 std::size_t minimum_size,
3399 std::size_t maximum_size)
3400 : task_(std::move(task)),
3401 minimum_(minimum_size),
3402 maximum_(maximum_size),
3403 finished_(maximum_size)
3404 {
3405 if (minimum_ < 1) {
3406 throw transwarp::invalid_parameter{"minimum size"};
3407 }
3408 if (minimum_ > maximum_) {
3409 throw transwarp::invalid_parameter{"minimum or maximum size"};
3410 }
3411 task_->add_listener(transwarp::event_type::after_finished, listener_);
3412 for (std::size_t i=0; i<minimum_; ++i) {
3413 idle_.push(task_->clone());
3414 }
3415 }
3416
3417 /// Constructs a task pool with reasonable defaults for minimum and maximum
3418 explicit
3419 task_pool(std::shared_ptr<transwarp::task<ResultType>> task)
3420 : task_pool(std::move(task), 32, 65536)
3421 {}
3422
3423 // delete copy/move semantics
3424 task_pool(const task_pool&) = delete;
3425 task_pool& operator=(const task_pool&) = delete;
3426 task_pool(task_pool&&) = delete;
3427 task_pool& operator=(task_pool&&) = delete;
3428
3429 /// Returns the next idle task.
3430 /// If there are no idle tasks then it will attempt to double the
3431 /// pool size. If that fails then it will return a nullptr. On successful
3432 /// retrieval of an idle task the function will mark that task as busy.
3433 std::shared_ptr<transwarp::task<ResultType>> next_task(bool maybe_resize=true) {
3434 const transwarp::itask* finished_task{};
3435 {
3436 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3437 if (!finished_.empty()) {
3438 finished_task = finished_.front(); finished_.pop();
3439 }
3440 }
3441
3442 std::shared_ptr<transwarp::task<ResultType>> task;
3443 if (finished_task) {
3444 task = busy_.find(finished_task)->second;
3445 } else {
3446 if (maybe_resize && idle_.empty()) {
3447 resize(size() * 2); // double pool size
3448 }
3449 if (idle_.empty()) {
3450 return nullptr;
3451 }
3452 task = idle_.front(); idle_.pop();
3453 busy_.emplace(task.get(), task);
3454 }
3455
3456 auto future = task->future();
3457 if (future.valid()) {
3458 future.wait(); // will return immediately
3459 }
3460 return task;
3461 }
3462
3463 /// Just like next_task() but waits for a task to become available.
3464 /// The returned task will always be a valid pointer
3465 std::shared_ptr<transwarp::task<ResultType>> wait_for_next_task(bool maybe_resize=true) {
3466 for (;;) {
3467 std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
3468 if (g) {
3469 return g;
3470 }
3471 }
3472 }
3473
3474 /// Returns the current total size of the pool (sum of idle and busy tasks)
3475 std::size_t size() const {
3476 return idle_.size() + busy_.size();
3477 }
3478
3479 /// Returns the minimum size of the pool
3480 std::size_t minimum_size() const {
3481 return minimum_;
3482 }
3483
3484 /// Returns the maximum size of the pool
3485 std::size_t maximum_size() const {
3486 return maximum_;
3487 }
3488
3489 /// Returns the number of idle tasks in the pool
3490 std::size_t idle_count() const {
3491 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3492 return idle_.size() + finished_.size();
3493 }
3494
3495 /// Returns the number of busy tasks in the pool
3496 std::size_t busy_count() const {
3497 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3498 return busy_.size() - finished_.size();
3499 }
3500
3501 /// Resizes the task pool to the given new size if possible
3502 void resize(std::size_t new_size) {
3503 reclaim();
3504 if (new_size > size()) { // grow
3505 const std::size_t count = new_size - size();
3506 for (std::size_t i=0; i<count; ++i) {
3507 if (size() == maximum_) {
3508 break;
3509 }
3510 idle_.push(task_->clone());
3511 }
3512 } else if (new_size < size()) { // shrink
3513 const std::size_t count = size() - new_size;
3514 for (std::size_t i=0; i<count; ++i) {
3515 if (idle_.empty() || size() == minimum_) {
3516 break;
3517 }
3518 idle_.pop();
3519 }
3520 }
3521 }
3522
3523 /// Reclaims finished tasks by marking them as idle again
3524 void reclaim() {
3525 decltype(finished_) finished{finished_.capacity()};
3526 {
3527 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3528 finished_.swap(finished);
3529 }
3530 while (!finished.empty()) {
3531 const transwarp::itask* task = finished.front(); finished.pop();
3532 const auto it = busy_.find(task);
3533 idle_.push(it->second);
3534 busy_.erase(it);
3535 }
3536 }
3537
3538 private:
3539
3540 class finished_listener : public transwarp::listener {
3541 public:
3542
3543 explicit
3544 finished_listener(task_pool<ResultType>& pool)
3545 : pool_(pool)
3546 {}
3547
3548 // Called on a potentially high-priority thread
3549 void handle_event(transwarp::event_type, transwarp::itask& task) override {
3550 std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3551 pool_.finished_.push(static_cast<const transwarp::itask*>(&task));
3552 }
3553
3554 private:
3555 task_pool<ResultType>& pool_;
3556 };
3557
3558 std::shared_ptr<transwarp::task<ResultType>> task_;
3559 std::size_t minimum_;
3560 std::size_t maximum_;
3561 mutable transwarp::detail::spinlock spinlock_; // protecting finished_
3562 transwarp::detail::circular_buffer<const transwarp::itask*> finished_;
3563 std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3564 std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3565 std::shared_ptr<transwarp::listener> listener_{new finished_listener(*this)};
3566 };
3567
3568
3569 /// A timer that tracks the average idle, wait, and run time of each task it listens to.
3570 /// - idle = time between scheduling and starting the task (executor dependent)
3571 /// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish
3572 /// - run = time between invoking and finishing the task's computations
3573 class timer : public transwarp::listener {
3574 public:
3575 timer() = default;
3576
3577 // delete copy/move semantics
3578 timer(const timer&) = delete;
3579 timer& operator=(const timer&) = delete;
3580 timer(timer&&) = delete;
3581 timer& operator=(timer&&) = delete;
3582
3583 /// Performs the actual timing and populates the task's timing members
3584 void handle_event(const transwarp::event_type event, transwarp::itask& task) override {
3585 switch (event) {
3586 case transwarp::event_type::before_scheduled: {
3587 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3588 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3589 auto& track = tracks_[&task];
3590 track.startidle = now;
3591 }
3592 break;
3593 case transwarp::event_type::before_started: {
3594 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3595 track_idletime(task, now);
3596 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3597 auto& track = tracks_[&task];
3598 track.startwait = now;
3599 }
3600 break;
3601 case transwarp::event_type::after_canceled: {
3602 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3603 track_waittime(task, now);
3604 }
3605 break;
3606 case transwarp::event_type::before_invoked: {
3607 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3608 track_waittime(task, now);
3609 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3610 auto& track = tracks_[&task];
3611 track.running = true;
3612 track.startrun = now;
3613 }
3614 break;
3615 case transwarp::event_type::after_finished: {
3616 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3617 track_runtime(task, now);
3618 }
3619 break;
3620 default: break;
3621 }
3622 }
3623
3624 /// Resets all timing information
3625 void reset() {
3626 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3627 tracks_.clear();
3628 }
3629
3630 private:
3631
3632 void track_idletime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3633 std::int64_t avg_idletime_us;
3634 {
3635 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3636 auto& track = tracks_[&task];
3637 track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3638 ++track.idlecount;
3639 avg_idletime_us = static_cast<std::int64_t>(track.idletime / track.idlecount);
3640 }
3641 task.set_avg_idletime_us(avg_idletime_us);
3642 }
3643
3644 void track_waittime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3645 std::int64_t avg_waittime_us;
3646 {
3647 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3648 auto& track = tracks_[&task];
3649 track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3650 ++track.waitcount;
3651 avg_waittime_us = static_cast<std::int64_t>(track.waittime / track.waitcount);
3652 }
3653 task.set_avg_waittime_us(avg_waittime_us);
3654 }
3655
3656 void track_runtime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3657 std::int64_t avg_runtime_us;
3658 {
3659 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3660 auto& track = tracks_[&task];
3661 if (!track.running) {
3662 return;
3663 }
3664 track.running = false;
3665 track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3666 ++track.runcount;
3667 avg_runtime_us = static_cast<std::int64_t>(track.runtime / track.runcount);
3668 }
3669 task.set_avg_runtime_us(avg_runtime_us);
3670 }
3671
3672 struct track {
3673 bool running = false;
3674 std::chrono::time_point<std::chrono::steady_clock> startidle;
3675 std::chrono::time_point<std::chrono::steady_clock> startwait;
3676 std::chrono::time_point<std::chrono::steady_clock> startrun;
3677 std::chrono::microseconds::rep idletime = 0;
3678 std::chrono::microseconds::rep idlecount = 0;
3679 std::chrono::microseconds::rep waittime = 0;
3680 std::chrono::microseconds::rep waitcount = 0;
3681 std::chrono::microseconds::rep runtime = 0;
3682 std::chrono::microseconds::rep runcount = 0;
3683 };
3684
3685 transwarp::detail::spinlock spinlock_; // protecting tracks_
3686 std::unordered_map<const transwarp::itask*, track> tracks_;
3687 };
3688
3689
3690 /// The releaser will release a task's future when the task's `after_satisfied`
3691 /// event was received which happens when all children received the task's result.
3692 /// The releaser should be used in cases where the task's result is only needed
3693 /// for consumption by its children and can then be discarded.
3694 class releaser : public transwarp::listener {
3695 public:
3696 releaser() = default;
3697
3698 /// The executor gives control over where a task's future is released
3699 explicit releaser(std::shared_ptr<transwarp::executor> executor)
3700 : executor_(std::move(executor))
3701 {}
3702
3703 // delete copy/move semantics
3704 releaser(const releaser&) = delete;
3705 releaser& operator=(const releaser&) = delete;
3706 releaser(releaser&&) = delete;
3707 releaser& operator=(releaser&&) = delete;
3708
3709 void handle_event(const transwarp::event_type event, transwarp::itask& task) override {
3710 if (event == transwarp::event_type::after_satisfied) {
3711 if (executor_) {
3712 executor_->execute([&task]{ task.reset_future(); }, task);
3713 } else {
3714 task.reset_future();
3715 }
3716 }
3717 }
3718
3719 private:
3720 std::shared_ptr<transwarp::executor> executor_;
3721 };
3722
3723
3724 } // transwarp
3725