1 //  Copyright (c) 2007-2017 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_LCOS_DETAIL_FUTURE_DATA_MAR_06_2012_1055AM)
7 #define HPX_LCOS_DETAIL_FUTURE_DATA_MAR_06_2012_1055AM
8 
9 #include <hpx/config.hpp>
10 #include <hpx/error_code.hpp>
11 #include <hpx/lcos/local/detail/condition_variable.hpp>
12 #include <hpx/lcos/local/spinlock.hpp>
13 #include <hpx/runtime/launch_policy.hpp>
14 #include <hpx/runtime/threads/coroutines/detail/get_stack_pointer.hpp>
15 #include <hpx/runtime/threads/thread_executor.hpp>
16 #include <hpx/runtime/threads/thread_helpers.hpp>
17 #include <hpx/throw_exception.hpp>
18 #include <hpx/traits/future_access.hpp>
19 #include <hpx/traits/get_remote_result.hpp>
20 #include <hpx/util/annotated_function.hpp>
21 #include <hpx/util/assert.hpp>
22 #include <hpx/util/assert_owns_lock.hpp>
23 #include <hpx/util/atomic_count.hpp>
24 #include <hpx/util/bind.hpp>
25 #include <hpx/util/decay.hpp>
26 #include <hpx/util/steady_clock.hpp>
27 #include <hpx/util/unique_function.hpp>
28 #include <hpx/util/unused.hpp>
29 
30 #include <boost/intrusive_ptr.hpp>
31 
32 // NOTE: small_vector was introduced in 1.58 but seems to be buggy in that
33 // version, so use only from 1.59 onwards.
34 #if BOOST_VERSION < 105900
35 #include <vector>
36 #else
37 #include <boost/container/small_vector.hpp>
38 #endif
39 
40 #include <atomic>
41 #include <chrono>
42 #include <cstddef>
43 #include <exception>
44 #include <memory>
45 #include <mutex>
46 #include <string>
47 #include <type_traits>
48 #include <utility>
49 
50 #include <hpx/config/warnings_prefix.hpp>
51 
52 ///////////////////////////////////////////////////////////////////////////////
53 namespace hpx { namespace lcos
54 {
55     enum class future_status
56     {
57         ready, timeout, deferred, uninitialized
58     };
59 }}
60 
61 ///////////////////////////////////////////////////////////////////////////////
62 namespace hpx { namespace lcos
63 {
64 
65 namespace detail
66 {
67     template <typename Result> struct future_data;
68 
69     ///////////////////////////////////////////////////////////////////////
70     struct future_data_refcnt_base;
71 
72     void intrusive_ptr_add_ref(future_data_refcnt_base* p);
73     void intrusive_ptr_release(future_data_refcnt_base* p);
74 
75     ///////////////////////////////////////////////////////////////////////
76     struct HPX_EXPORT future_data_refcnt_base
77     {
78     public:
79         typedef util::unique_function_nonser<void()> completed_callback_type;
80 #if BOOST_VERSION < 105900
81         typedef std::vector<completed_callback_type>
82             completed_callback_vector_type;
83 #else
84         typedef boost::container::small_vector<completed_callback_type, 3>
85             completed_callback_vector_type;
86 #endif
87 
88         typedef void has_future_data_refcnt_base;
89 
90         virtual ~future_data_refcnt_base();
91 
92         virtual void set_on_completed(completed_callback_type) = 0;
93 
requires_deletehpx::lcos::detail::future_data_refcnt_base94         virtual bool requires_delete()
95         {
96             return 0 == --count_;
97         }
98 
destroyhpx::lcos::detail::future_data_refcnt_base99         virtual void destroy()
100         {
101             delete this;
102         }
103 
104         // This is a tag type used to convey the information that the caller is
105         // _not_ going to addref the future_data instance
106         struct init_no_addref {};
107 
108     protected:
future_data_refcnt_basehpx::lcos::detail::future_data_refcnt_base109         future_data_refcnt_base() : count_(0) {}
future_data_refcnt_basehpx::lcos::detail::future_data_refcnt_base110         future_data_refcnt_base(init_no_addref) : count_(1) {}
111 
112         // reference counting
113         friend void intrusive_ptr_add_ref(future_data_refcnt_base* p);
114         friend void intrusive_ptr_release(future_data_refcnt_base* p);
115 
116         util::atomic_count count_;
117     };
118 
119     /// support functions for boost::intrusive_ptr
intrusive_ptr_add_ref(future_data_refcnt_base * p)120     inline void intrusive_ptr_add_ref(future_data_refcnt_base* p)
121     {
122         ++p->count_;
123     }
intrusive_ptr_release(future_data_refcnt_base * p)124     inline void intrusive_ptr_release(future_data_refcnt_base* p)
125     {
126         if (p->requires_delete())
127             p->destroy();
128     }
129 
130     ///////////////////////////////////////////////////////////////////////////
131     template <typename Result>
132     struct future_data_result
133     {
134         typedef Result type;
135 
136         template <typename U>
137         HPX_FORCEINLINE static
sethpx::lcos::detail::future_data_result138         U && set(U && u)
139         {
140             return std::forward<U>(u);
141         }
142     };
143 
144     template <typename Result>
145     struct future_data_result<Result&>
146     {
147         typedef Result* type;
148 
149         HPX_FORCEINLINE static
sethpx::lcos::detail::future_data_result150         Result* set(Result* u)
151         {
152             return u;
153         }
154 
155         HPX_FORCEINLINE static
sethpx::lcos::detail::future_data_result156         Result* set(Result& u)
157         {
158             return &u;
159         }
160     };
161 
162     template <>
163     struct future_data_result<void>
164     {
165         typedef util::unused_type type;
166 
167         HPX_FORCEINLINE static
sethpx::lcos::detail::future_data_result168         util::unused_type set(util::unused_type u)
169         {
170             return u;
171         }
172     };
173 
174     ///////////////////////////////////////////////////////////////////////////
175     template <typename R>
176     struct future_data_storage
177     {
178         typedef typename future_data_result<R>::type value_type;
179         typedef std::exception_ptr error_type;
180 
181         // determine the required alignment, define aligned storage of proper
182         // size
183         HPX_STATIC_CONSTEXPR std::size_t max_alignment =
184             (std::alignment_of<value_type>::value >
185              std::alignment_of<error_type>::value) ?
186             std::alignment_of<value_type>::value
187           : std::alignment_of<error_type>::value;
188 
189         HPX_STATIC_CONSTEXPR std::size_t max_size =
190                 (sizeof(value_type) > sizeof(error_type)) ?
191                     sizeof(value_type) : sizeof(error_type);
192 
193         typedef typename std::aligned_storage<max_size, max_alignment>::type type;
194     };
195 
196     ///////////////////////////////////////////////////////////////////////////
197     template <typename Result>
198     struct future_data_base;
199 
200     template <>
201     struct HPX_EXPORT future_data_base<traits::detail::future_data_void>
202       : future_data_refcnt_base
203     {
future_data_basehpx::lcos::detail::future_data_base204         future_data_base()
205           : state_(empty)
206         {}
207 
future_data_basehpx::lcos::detail::future_data_base208         future_data_base(init_no_addref no_addref)
209           : future_data_refcnt_base(no_addref), state_(empty)
210         {}
211 
212         using future_data_refcnt_base::completed_callback_type;
213         using future_data_refcnt_base::completed_callback_vector_type;
214         typedef lcos::local::spinlock mutex_type;
215         typedef util::unused_type result_type;
216         typedef future_data_refcnt_base::init_no_addref init_no_addref;
217 
218         virtual ~future_data_base();
219 
220         enum state
221         {
222             empty = 0,
223             ready = 1,
224             value = 2 | ready,
225             exception = 4 | ready
226         };
227 
228         /// Return whether or not the data is available for this
229         /// \a future.
is_readyhpx::lcos::detail::future_data_base230         bool is_ready(std::memory_order order = std::memory_order_acquire) const
231         {
232             return (state_.load(order) & ready) != 0;
233         }
234 
has_valuehpx::lcos::detail::future_data_base235         bool has_value() const
236         {
237             return state_.load(std::memory_order_acquire) == value;
238         }
239 
has_exceptionhpx::lcos::detail::future_data_base240         bool has_exception() const
241         {
242             return state_.load(std::memory_order_acquire) == exception;
243         }
244 
execute_deferredhpx::lcos::detail::future_data_base245         virtual void execute_deferred(error_code& /*ec*/ = throws) {}
246 
247         // cancellation is disabled by default
cancelablehpx::lcos::detail::future_data_base248         virtual bool cancelable() const
249         {
250             return false;
251         }
cancelhpx::lcos::detail::future_data_base252         virtual void cancel()
253         {
254             HPX_THROW_EXCEPTION(future_does_not_support_cancellation,
255                 "future_data_base::cancel",
256                 "this future does not support cancellation");
257         }
258 
259         result_type* get_result_void(void const* storage, error_code& ec = throws);
260         virtual result_type* get_result_void(error_code& ec = throws) = 0;
261 
262         virtual void set_exception(std::exception_ptr data) = 0;
263 
264         // continuation support
265 
266         // deferred execution of a given continuation
267         static void run_on_completed(
268             completed_callback_type&& on_completed) noexcept;
269         static void run_on_completed(
270             completed_callback_vector_type&& on_completed) noexcept;
271 
272         // make sure continuation invocation does not recurse deeper than
273         // allowed
274         template <typename Callback>
275         static void handle_on_completed(Callback&& on_completed);
276 
277         /// Set the callback which needs to be invoked when the future becomes
278         /// ready. If the future is ready the function will be invoked
279         /// immediately.
280         void set_on_completed(completed_callback_type data_sink) override;
281 
282         virtual state wait(error_code& ec = throws);
283 
284         virtual future_status wait_until(
285             util::steady_clock::time_point const& abs_time, error_code& ec = throws);
286 
287         virtual std::exception_ptr get_exception_ptr() const = 0;
288 
get_registered_namehpx::lcos::detail::future_data_base289         virtual std::string const& get_registered_name() const
290         {
291             HPX_THROW_EXCEPTION(invalid_status,
292                 "future_data_base::get_registered_name",
293                 "this future does not support name registration");
294         }
register_ashpx::lcos::detail::future_data_base295         virtual void register_as(std::string const& /*name*/, bool /*manage_lifetime*/)
296         {
297             HPX_THROW_EXCEPTION(invalid_status,
298                 "future_data_base::set_registered_name",
299                 "this future does not support name registration");
300         }
301 
302     protected:
303         mutable mutex_type mtx_;
304         std::atomic<state> state_;                  // current state
305         completed_callback_vector_type on_completed_;
306         local::detail::condition_variable cond_;    // threads waiting in read
307     };
308 
309     template <typename Result>
310     struct future_data_base
311       : future_data_base<traits::detail::future_data_void>
312     {
313         HPX_NON_COPYABLE(future_data_base);
314 
315     private:
constructhpx::lcos::detail::future_data_base316         static void construct(void* p)
317         {
318             ::new (p) result_type();
319         }
320 
321         template <typename T, typename ... Ts>
constructhpx::lcos::detail::future_data_base322         static void construct(void* p, T && t, Ts &&... ts)
323         {
324             ::new (p) result_type(
325                 future_data_result<Result>::set(std::forward<T>(t)),
326                 std::forward<Ts>(ts)...);
327         }
328 
329     public:
330         typedef typename future_data_result<Result>::type result_type;
331         typedef future_data_base<traits::detail::future_data_void> base_type;
332         typedef lcos::local::spinlock mutex_type;
333         typedef typename base_type::init_no_addref init_no_addref;
334         typedef typename base_type::completed_callback_type completed_callback_type;
335         typedef typename base_type::completed_callback_vector_type
336             completed_callback_vector_type;
337 
338         future_data_base() = default;
339 
future_data_basehpx::lcos::detail::future_data_base340         future_data_base(init_no_addref no_addref)
341           : base_type(no_addref)
342         {}
343 
344         struct default_construct {};
345 
future_data_basehpx::lcos::detail::future_data_base346         future_data_base(init_no_addref no_addref, default_construct)
347           : base_type(no_addref)
348         {
349             result_type* value_ptr = reinterpret_cast<result_type*>(&storage_);
350             construct(value_ptr);
351             state_.store(value, std::memory_order_relaxed);
352         }
353 
354         template <typename ... Ts>
future_data_basehpx::lcos::detail::future_data_base355         future_data_base(init_no_addref no_addref, Ts&&... ts)
356           : base_type(no_addref)
357         {
358             result_type* value_ptr = reinterpret_cast<result_type*>(&storage_);
359             construct(value_ptr, std::forward<Ts>(ts)...);
360             state_.store(value, std::memory_order_relaxed);
361         }
362 
future_data_basehpx::lcos::detail::future_data_base363         future_data_base(init_no_addref no_addref, std::exception_ptr const& e)
364           : base_type(no_addref)
365         {
366             std::exception_ptr* exception_ptr =
367                 reinterpret_cast<std::exception_ptr*>(&storage_);
368             ::new ((void*)exception_ptr) std::exception_ptr(e);
369             state_.store(exception, std::memory_order_relaxed);
370         }
future_data_basehpx::lcos::detail::future_data_base371         future_data_base(init_no_addref no_addref, std::exception_ptr && e)
372           : base_type(no_addref)
373         {
374             std::exception_ptr* exception_ptr =
375                 reinterpret_cast<std::exception_ptr*>(&storage_);
376             ::new ((void*)exception_ptr) std::exception_ptr(std::move(e));
377             state_.store(exception, std::memory_order_relaxed);
378         }
379 
~future_data_basehpx::lcos::detail::future_data_base380         virtual ~future_data_base() noexcept
381         {
382             reset();
383         }
384 
385         /// Get the result of the requested action. This call blocks (yields
386         /// control) if the result is not ready. As soon as the result has been
387         /// returned and the waiting thread has been re-scheduled by the thread
388         /// manager the function will return.
389         ///
390         /// \param ec     [in,out] this represents the error status on exit,
391         ///               if this is pre-initialized to \a hpx#throws
392         ///               the function will throw on error instead. If the
393         ///               operation blocks and is aborted because the object
394         ///               went out of scope, the code \a hpx#yield_aborted is
395         ///               set or thrown.
396         ///
397         /// \note         If there has been an error reported (using the action
398         ///               \a base_lco#set_exception), this function will throw an
399         ///               exception encapsulating the reported error code and
400         ///               error description if <code>&ec == &throws</code>.
get_resulthpx::lcos::detail::future_data_base401         virtual result_type* get_result(error_code& ec = throws)
402         {
403             if (get_result_void(ec) != nullptr)
404                 return reinterpret_cast<result_type*>(&storage_);
405             return nullptr;
406         }
407 
get_result_voidhpx::lcos::detail::future_data_base408         util::unused_type* get_result_void(error_code& ec = throws) override
409         {
410             return base_type::get_result_void(&storage_, ec);
411         }
412 
413         // Set the result of the requested action.
414         template <typename ... Ts>
set_valuehpx::lcos::detail::future_data_base415         void set_value(Ts&& ... ts)
416         {
417             // Note: it is safe to access the data store as no other thread
418             //       should access it concurrently. There shouldn't be any
419             //       threads attempting to read the value as the state is still
420             //       empty. Also, there can be only one thread (this thread)
421             //       attempting to set the value by definition.
422 
423             // set the data
424             result_type* value_ptr = reinterpret_cast<result_type*>(&storage_);
425             construct(value_ptr, std::forward<Ts>(ts)...);
426 
427             // At this point the lock needs to be acquired to safely access the
428             // registered continuations
429             std::unique_lock<mutex_type> l(mtx_);
430 
431             // handle all threads waiting for the future to become ready
432             auto on_completed = std::move(on_completed_);
433             on_completed_.clear();
434 
435             // The value has been set, changing the state to 'value' at this
436             // point signals to all other threads that this future is ready.
437             state expected = empty;
438             if (!state_.compare_exchange_strong(
439                     expected, value, std::memory_order_release))
440             {
441                 // this future should be 'empty' still (it can't be made ready
442                 // more than once).
443                 l.unlock();
444                 HPX_THROW_EXCEPTION(promise_already_satisfied,
445                     "future_data_base::set_value",
446                     "data has already been set for this future");
447                 return;
448             }
449 
450             // Note: we use notify_one repeatedly instead of notify_all as we
451             //       know: a) that most of the time we have at most one thread
452             //       waiting on the future (most futures are not shared), and
453             //       b) our implementation of condition_variable::notify_one
454             //       relinquishes the lock before resuming the waiting thread
455             //       which avoids suspension of this thread when it tries to
456             //       re-lock the mutex while exiting from condition_variable::wait
457             while (cond_.notify_one(std::move(l), threads::thread_priority_boost))
458             {
459                 l = std::unique_lock<mutex_type>(mtx_);
460             }
461 
462             // Note: cv.notify_one() above 'consumes' the lock 'l' and leaves
463             //       it unlocked when returning.
464 
465             // invoke the callback (continuation) function
466             if (!on_completed.empty())
467                 handle_on_completed(std::move(on_completed));
468         }
469 
set_exceptionhpx::lcos::detail::future_data_base470         void set_exception(std::exception_ptr data) override
471         {
472             // Note: it is safe to access the data store as no other thread
473             //       should access it concurrently. There shouldn't be any
474             //       threads attempting to read the value as the state is still
475             //       empty. Also, there can be only one thread (this thread)
476             //       attempting to set the value by definition.
477 
478             // set the data
479             std::exception_ptr* exception_ptr =
480                 reinterpret_cast<std::exception_ptr*>(&storage_);
481             ::new ((void*)exception_ptr) std::exception_ptr(std::move(data));
482 
483             // At this point the lock needs to be acquired to safely access the
484             // registered continuations
485             std::unique_lock<mutex_type> l(mtx_);
486 
487             // handle all threads waiting for the future to become ready
488             auto on_completed = std::move(on_completed_);
489             on_completed_.clear();
490 
491             // The value has been set, changing the state to 'exception' at this
492             // point signals to all other threads that this future is ready.
493             state expected = empty;
494             if (!state_.compare_exchange_strong(
495                     expected, exception, std::memory_order_release))
496             {
497                 // this future should be 'empty' still (it can't be made ready
498                 // more than once).
499                 l.unlock();
500                 HPX_THROW_EXCEPTION(promise_already_satisfied,
501                     "future_data_base::set_exception",
502                     "data has already been set for this future");
503                 return;
504             }
505 
506             // Note: we use notify_one repeatedly instead of notify_all as we
507             //       know: a) that most of the time we have at most one thread
508             //       waiting on the future (most futures are not shared), and
509             //       b) our implementation of condition_variable::notify_one
510             //       relinquishes the lock before resuming the waiting thread
511             //       which avoids suspension of this thread when it tries to
512             //       re-lock the mutex while exiting from condition_variable::wait
513             while (cond_.notify_one(std::move(l), threads::thread_priority_boost))
514             {
515                 l = std::unique_lock<mutex_type>(mtx_);
516             }
517 
518             // Note: cv.notify_one() above 'consumes' the lock 'l' and leaves
519             //       it unlocked when returning.
520 
521             // invoke the callback (continuation) function
522             if (!on_completed.empty())
523                 handle_on_completed(std::move(on_completed));
524         }
525 
526         // helper functions for setting data (if successful) or the error (if
527         // non-successful)
528         template <typename T>
set_datahpx::lcos::detail::future_data_base529         void set_data(T && result)
530         {
531             // set the received result, reset error status
532             try {
533                 typedef typename util::decay<T>::type naked_type;
534 
535                 typedef traits::get_remote_result<
536                     result_type, naked_type
537                 > get_remote_result_type;
538 
539                 // store the value
540                 set_value(std::move(get_remote_result_type::call(
541                         std::forward<T>(result))));
542             }
543             catch (...) {
544                 // store the error instead
545                 return set_exception(std::current_exception());
546             }
547         }
548 
549         // trigger the future with the given error condition
set_errorhpx::lcos::detail::future_data_base550         void set_error(error e, char const* f, char const* msg)
551         {
552             try {
553                 HPX_THROW_EXCEPTION(e, f, msg);
554             }
555             catch (...) {
556                 // store the error code
557                 set_exception(std::current_exception());
558             }
559         }
560 
561         /// Reset the promise to allow to restart an asynchronous
562         /// operation. Allows any subsequent set_data operation to succeed.
resethpx::lcos::detail::future_data_base563         void reset(error_code& /*ec*/ = throws)
564         {
565             // no locking is required as semantics guarantee a single writer
566             // and no reader
567 
568             // release any stored data and callback functions
569             switch (state_.exchange(empty)) {
570             case value:
571             {
572                 result_type* value_ptr =
573                     reinterpret_cast<result_type*>(&storage_);
574                 value_ptr->~result_type();
575                 break;
576             }
577             case exception:
578             {
579                 std::exception_ptr* exception_ptr =
580                     reinterpret_cast<std::exception_ptr*>(&storage_);
581                 exception_ptr->~exception_ptr();
582                 break;
583             }
584             default: break;
585             }
586 
587             on_completed_.clear();
588         }
589 
get_exception_ptrhpx::lcos::detail::future_data_base590         std::exception_ptr get_exception_ptr() const override
591         {
592             HPX_ASSERT(state_.load(std::memory_order_acquire) == exception);
593             return *reinterpret_cast<std::exception_ptr const*>(&storage_);
594         }
595 
596     protected:
597         using base_type::mtx_;
598         using base_type::state_;
599         using base_type::on_completed_;
600 
601     private:
602         using base_type::cond_;
603         typename future_data_storage<Result>::type storage_;
604     };
605 
606     ///////////////////////////////////////////////////////////////////////////
607     // Customization point to have the ability for creating distinct shared
608     // states depending on the value type held.
609     template <typename Result>
610     struct future_data : future_data_base<Result>
611     {
612         HPX_NON_COPYABLE(future_data);
613 
614         typedef typename future_data_base<Result>::init_no_addref init_no_addref;
615 
616         future_data() = default;
617 
future_datahpx::lcos::detail::future_data618         future_data(init_no_addref no_addref)
619           : future_data_base<Result>(no_addref)
620         {}
621 
622         template <typename ... Ts>
future_datahpx::lcos::detail::future_data623         future_data(init_no_addref no_addref, Ts&&... ts)
624           : future_data_base<Result>(no_addref, std::forward<Ts>(ts)...)
625         {}
626 
future_datahpx::lcos::detail::future_data627         future_data(init_no_addref no_addref, std::exception_ptr const& e)
628           : future_data_base<Result>(no_addref, e)
629         {}
future_datahpx::lcos::detail::future_data630         future_data(init_no_addref no_addref, std::exception_ptr && e)
631           : future_data_base<Result>(no_addref, std::move(e))
632         {}
633 
634         ~future_data() noexcept override = default;
635     };
636 
637     // Specialization for shared state of id_type, additionally (optionally)
638     // holds a registered name for the object it refers to.
639     template <> struct future_data<id_type>;
640 
641     ///////////////////////////////////////////////////////////////////////////
642     template <typename Result, typename Allocator>
643     struct future_data_allocator : future_data<Result>
644     {
645         typedef typename future_data<Result>::init_no_addref init_no_addref;
646         typedef typename
647                 std::allocator_traits<Allocator>::template
648                     rebind_alloc<future_data_allocator>
649             other_allocator;
650 
651         typedef typename future_data_base<Result>::default_construct
652             default_construct;
653 
future_data_allocatorhpx::lcos::detail::future_data_allocator654         future_data_allocator(other_allocator const& alloc)
655           : future_data<Result>()
656           , alloc_(alloc)
657         {}
658 
659         template <typename... T>
future_data_allocatorhpx::lcos::detail::future_data_allocator660         future_data_allocator(init_no_addref no_addref,
661                 other_allocator const& alloc, T&&... ts)
662           : future_data<Result>(no_addref, std::forward<T>(ts)...)
663           , alloc_(alloc)
664         {}
665 
666         template <typename... T>
future_data_allocatorhpx::lcos::detail::future_data_allocator667         future_data_allocator(init_no_addref no_addref,
668                 default_construct defctr,
669                 other_allocator const& alloc, T&&... ts)
670           : future_data<Result>(no_addref, defctr, std::forward<T>(ts)...)
671           , alloc_(alloc)
672         {}
673 
future_data_allocatorhpx::lcos::detail::future_data_allocator674         future_data_allocator(init_no_addref no_addref,
675                 std::exception_ptr const& e, other_allocator const& alloc)
676           : future_data<Result>(no_addref, e)
677           , alloc_(alloc)
678         {}
679 
future_data_allocatorhpx::lcos::detail::future_data_allocator680         future_data_allocator(init_no_addref no_addref,
681                 std::exception_ptr && e, other_allocator const& alloc)
682           : future_data<Result>(no_addref, std::move(e))
683           , alloc_(alloc)
684         {}
685 
686     private:
destroyhpx::lcos::detail::future_data_allocator687         void destroy()
688         {
689             typedef std::allocator_traits<other_allocator> traits;
690 
691             other_allocator alloc(alloc_);
692             traits::destroy(alloc, this);
693             traits::deallocate(alloc, this, 1);
694         }
695 
696     private:
697         other_allocator alloc_;
698     };
699 
700     ///////////////////////////////////////////////////////////////////////////
701     template <typename Result>
702     struct timed_future_data : future_data<Result>
703     {
704     public:
705         typedef future_data<Result> base_type;
706         typedef typename base_type::result_type result_type;
707         typedef typename base_type::mutex_type mutex_type;
708 
709     public:
timed_future_datahpx::lcos::detail::timed_future_data710         timed_future_data() {}
711 
712         template <typename Result_>
timed_future_datahpx::lcos::detail::timed_future_data713         timed_future_data(
714             util::steady_clock::time_point const& abs_time,
715             Result_&& init)
716         {
717             boost::intrusive_ptr<timed_future_data> this_(this);
718 
719             error_code ec;
720             threads::thread_id_type id = threads::register_thread_nullary(
721                 [HPX_CAPTURE_MOVE(this_), HPX_CAPTURE_FORWARD(init)]()
722                 {
723                     this_->set_value(init);
724                 },
725                 "timed_future_data<Result>::timed_future_data",
726                 threads::suspended, true, threads::thread_priority_boost,
727                 threads::thread_schedule_hint(),
728                 threads::thread_stacksize_current, ec);
729             if (ec) {
730                 // thread creation failed, report error to the new future
731                 this->base_type::set_exception(hpx::detail::access_exception(ec));
732             }
733 
734             // start new thread at given point in time
735             threads::set_thread_state(id, abs_time, threads::pending,
736                 threads::wait_timeout, threads::thread_priority_boost, ec);
737             if (ec) {
738                 // thread scheduling failed, report error to the new future
739                 this->base_type::set_exception(hpx::detail::access_exception(ec));
740             }
741         }
742     };
743 
744     ///////////////////////////////////////////////////////////////////////////
745     template <typename Result>
746     struct task_base : future_data<Result>
747     {
748     protected:
749         typedef future_data<Result> base_type;
750         typedef typename future_data<Result>::mutex_type mutex_type;
751         typedef boost::intrusive_ptr<task_base> future_base_type;
752         typedef typename future_data<Result>::result_type result_type;
753         typedef typename base_type::init_no_addref init_no_addref;
754 
755     public:
task_basehpx::lcos::detail::task_base756         task_base()
757           : started_(false)
758         {}
759 
task_basehpx::lcos::detail::task_base760         task_base(init_no_addref no_addref)
761           : base_type(no_addref), started_(false)
762         {}
763 
execute_deferredhpx::lcos::detail::task_base764         virtual void execute_deferred(error_code& /*ec*/ = throws)
765         {
766             if (!started_test_and_set())
767                 this->do_run();
768         }
769 
770         // retrieving the value
get_resulthpx::lcos::detail::task_base771         virtual result_type* get_result(error_code& ec = throws)
772         {
773             if (!started_test_and_set())
774                 this->do_run();
775             return this->future_data<Result>::get_result(ec);
776         }
777 
778         // wait support
waithpx::lcos::detail::task_base779         virtual typename base_type::state wait(error_code& ec = throws)
780         {
781             if (!started_test_and_set())
782                 this->do_run();
783             return this->future_data<Result>::wait(ec);
784         }
785 
786         virtual future_status
wait_untilhpx::lcos::detail::task_base787         wait_until(util::steady_clock::time_point const& abs_time,
788             error_code& ec = throws)
789         {
790             if (!started_test())
791                 return future_status::deferred; //-V110
792             return this->future_data<Result>::wait_until(abs_time, ec);
793         }
794 
795     private:
started_testhpx::lcos::detail::task_base796         bool started_test() const
797         {
798             std::lock_guard<mutex_type> l(this->mtx_);
799             return started_;
800         }
801 
802         template <typename Lock>
started_test_and_set_lockedhpx::lcos::detail::task_base803         bool started_test_and_set_locked(Lock& l)
804         {
805             HPX_ASSERT_OWNS_LOCK(l);
806             if (started_)
807                 return true;
808 
809             started_ = true;
810             return false;
811         }
812 
813     protected:
started_test_and_sethpx::lcos::detail::task_base814         bool started_test_and_set()
815         {
816             std::lock_guard<mutex_type> l(this->mtx_);
817             return started_test_and_set_locked(l);
818         }
819 
check_startedhpx::lcos::detail::task_base820         void check_started()
821         {
822             std::unique_lock<mutex_type> l(this->mtx_);
823             if (started_) {
824                 l.unlock();
825                 HPX_THROW_EXCEPTION(task_already_started,
826                     "task_base::check_started",
827                     "this task has already been started");
828                 return;
829             }
830             started_ = true;
831         }
832 
833     public:
834         // run synchronously
runhpx::lcos::detail::task_base835         void run()
836         {
837             check_started();
838             this->do_run();       // always on this thread
839         }
840 
841         // run in a separate thread
applyhpx::lcos::detail::task_base842         virtual threads::thread_id_type apply(launch /*policy*/,
843             threads::thread_priority /*priority*/,
844             threads::thread_stacksize /*stacksize*/,
845             threads::thread_schedule_hint /*schedulehint*/,
846             error_code& /*ec*/)
847         {
848             HPX_ASSERT(false);      // shouldn't ever be called
849             return threads::invalid_thread_id;
850         }
851 
852     protected:
run_implhpx::lcos::detail::task_base853         static threads::thread_result_type run_impl(future_base_type this_)
854         {
855             this_->do_run();
856             return threads::thread_result_type(
857                 threads::terminated, threads::invalid_thread_id);
858         }
859 
860     public:
861         template <typename T>
set_datahpx::lcos::detail::task_base862         void set_data(T && result)
863         {
864             this->future_data<Result>::set_data(std::forward<T>(result));
865         }
866 
set_exceptionhpx::lcos::detail::task_base867         void set_exception(std::exception_ptr e)
868         {
869             this->future_data<Result>::set_exception(std::move(e));
870         }
871 
do_runhpx::lcos::detail::task_base872         virtual void do_run()
873         {
874             HPX_ASSERT(false);      // shouldn't ever be called
875         }
876 
877     protected:
878         bool started_;
879     };
880 
881     ///////////////////////////////////////////////////////////////////////////
882     template <typename Result>
883     struct cancelable_task_base : task_base<Result>
884     {
885     protected:
886         typedef typename task_base<Result>::mutex_type mutex_type;
887         typedef boost::intrusive_ptr<cancelable_task_base> future_base_type;
888         typedef typename future_data<Result>::result_type result_type;
889         typedef typename task_base<Result>::init_no_addref init_no_addref;
890 
891     protected:
get_thread_idhpx::lcos::detail::cancelable_task_base892         threads::thread_id_type get_thread_id() const
893         {
894             std::lock_guard<mutex_type> l(this->mtx_);
895             return id_;
896         }
set_thread_idhpx::lcos::detail::cancelable_task_base897         void set_thread_id(threads::thread_id_type id)
898         {
899             std::lock_guard<mutex_type> l(this->mtx_);
900             id_ = id;
901         }
902 
903     public:
cancelable_task_basehpx::lcos::detail::cancelable_task_base904         cancelable_task_base()
905           : id_(threads::invalid_thread_id)
906         {}
907 
cancelable_task_basehpx::lcos::detail::cancelable_task_base908         cancelable_task_base(init_no_addref no_addref)
909           : task_base<Result>(no_addref), id_(threads::invalid_thread_id)
910         {}
911 
912     private:
913         struct reset_id
914         {
reset_idhpx::lcos::detail::cancelable_task_base::reset_id915             reset_id(cancelable_task_base& target)
916               : target_(target)
917             {
918                 target.set_thread_id(threads::get_self_id());
919             }
~reset_idhpx::lcos::detail::cancelable_task_base::reset_id920             ~reset_id()
921             {
922                 target_.set_thread_id(threads::invalid_thread_id);
923             }
924             cancelable_task_base& target_;
925         };
926 
927     protected:
run_implhpx::lcos::detail::cancelable_task_base928         static threads::thread_result_type run_impl(future_base_type this_)
929         {
930             reset_id r(*this_);
931             this_->do_run();
932             return threads::thread_result_type(
933                 threads::terminated, threads::invalid_thread_id);
934         }
935 
936     public:
937         // cancellation support
cancelablehpx::lcos::detail::cancelable_task_base938         bool cancelable() const
939         {
940             return true;
941         }
942 
cancelhpx::lcos::detail::cancelable_task_base943         void cancel()
944         {
945             std::unique_lock<mutex_type> l(this->mtx_);
946             try {
947                 if (!this->started_)
948                     HPX_THROW_THREAD_INTERRUPTED_EXCEPTION();
949 
950                 if (this->is_ready())
951                     return;   // nothing we can do
952 
953                 if (id_ != threads::invalid_thread_id) {
954                     // interrupt the executing thread
955                     threads::interrupt_thread(id_);
956 
957                     this->started_ = true;
958 
959                     l.unlock();
960                     this->set_error(future_cancelled,
961                         "task_base<Result>::cancel",
962                         "future has been canceled");
963                 }
964                 else {
965                     l.unlock();
966                     HPX_THROW_EXCEPTION(future_can_not_be_cancelled,
967                         "task_base<Result>::cancel",
968                         "future can't be canceled at this time");
969                 }
970             }
971             catch (...) {
972                 this->started_ = true;
973                 this->set_exception(std::current_exception());
974                 throw;
975             }
976         }
977 
978     protected:
979         threads::thread_id_type id_;
980     };
981 }}}
982 
983 namespace hpx { namespace traits
984 {
985     namespace detail
986     {
987         template <typename R, typename Allocator>
988         struct shared_state_allocator<lcos::detail::future_data<R>, Allocator>
989         {
990             typedef lcos::detail::future_data_allocator<R, Allocator> type;
991         };
992     }
993 }}
994 
995 #include <hpx/config/warnings_suffix.hpp>
996 
997 #endif
998