1 //
2 // impl/awaitable.hpp
3 // ~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #ifndef BOOST_ASIO_IMPL_AWAITABLE_HPP
12 #define BOOST_ASIO_IMPL_AWAITABLE_HPP
13 
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17 
18 #include <boost/asio/detail/config.hpp>
19 #include <exception>
20 #include <new>
21 #include <tuple>
22 #include <utility>
23 #include <boost/asio/cancellation_signal.hpp>
24 #include <boost/asio/cancellation_state.hpp>
25 #include <boost/asio/detail/thread_context.hpp>
26 #include <boost/asio/detail/thread_info_base.hpp>
27 #include <boost/asio/detail/throw_error.hpp>
28 #include <boost/asio/detail/type_traits.hpp>
29 #include <boost/asio/error.hpp>
30 #include <boost/asio/post.hpp>
31 #include <boost/system/system_error.hpp>
32 #include <boost/asio/this_coro.hpp>
33 
34 #include <boost/asio/detail/push_options.hpp>
35 
36 namespace boost {
37 namespace asio {
38 namespace detail {
39 
40 struct awaitable_thread_has_context_switched {};
41 
42 // An awaitable_thread represents a thread-of-execution that is composed of one
43 // or more "stack frames", with each frame represented by an awaitable_frame.
44 // All execution occurs in the context of the awaitable_thread's executor. An
45 // awaitable_thread continues to "pump" the stack frames by repeatedly resuming
46 // the top stack frame until the stack is empty, or until ownership of the
47 // stack is transferred to another awaitable_thread object.
48 //
49 //                +------------------------------------+
50 //                | top_of_stack_                      |
51 //                |                                    V
52 // +--------------+---+                            +-----------------+
53 // |                  |                            |                 |
54 // | awaitable_thread |<---------------------------+ awaitable_frame |
55 // |                  |           attached_thread_ |                 |
56 // +--------------+---+           (Set only when   +---+-------------+
57 //                |               frames are being     |
58 //                |               actively pumped      | caller_
59 //                |               by a thread, and     |
60 //                |               then only for        V
61 //                |               the top frame.)  +-----------------+
62 //                |                                |                 |
63 //                |                                | awaitable_frame |
64 //                |                                |                 |
65 //                |                                +---+-------------+
66 //                |                                    |
67 //                |                                    | caller_
68 //                |                                    :
69 //                |                                    :
70 //                |                                    |
71 //                |                                    V
72 //                |                                +-----------------+
73 //                | bottom_of_stack_               |                 |
74 //                +------------------------------->| awaitable_frame |
75 //                                                 |                 |
76 //                                                 +-----------------+
77 
78 template <typename Executor>
79 class awaitable_frame_base
80 {
81 public:
82 #if !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
operator new(std::size_t size)83   void* operator new(std::size_t size)
84   {
85     return boost::asio::detail::thread_info_base::allocate(
86         boost::asio::detail::thread_info_base::awaitable_frame_tag(),
87         boost::asio::detail::thread_context::top_of_thread_call_stack(),
88         size);
89   }
90 
operator delete(void * pointer,std::size_t size)91   void operator delete(void* pointer, std::size_t size)
92   {
93     boost::asio::detail::thread_info_base::deallocate(
94         boost::asio::detail::thread_info_base::awaitable_frame_tag(),
95         boost::asio::detail::thread_context::top_of_thread_call_stack(),
96         pointer, size);
97   }
98 #endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
99 
100   // The frame starts in a suspended state until the awaitable_thread object
101   // pumps the stack.
initial_suspend()102   auto initial_suspend() noexcept
103   {
104     return suspend_always();
105   }
106 
107   // On final suspension the frame is popped from the top of the stack.
final_suspend()108   auto final_suspend() noexcept
109   {
110     struct result
111     {
112       awaitable_frame_base* this_;
113 
114       bool await_ready() const noexcept
115       {
116         return false;
117       }
118 
119       void await_suspend(coroutine_handle<void>) noexcept
120       {
121         this->this_->pop_frame();
122       }
123 
124       void await_resume() const noexcept
125       {
126       }
127     };
128 
129     return result{this};
130   }
131 
set_except(std::exception_ptr e)132   void set_except(std::exception_ptr e) noexcept
133   {
134     pending_exception_ = e;
135   }
136 
set_error(const boost::system::error_code & ec)137   void set_error(const boost::system::error_code& ec)
138   {
139     this->set_except(std::make_exception_ptr(boost::system::system_error(ec)));
140   }
141 
unhandled_exception()142   void unhandled_exception()
143   {
144     set_except(std::current_exception());
145   }
146 
rethrow_exception()147   void rethrow_exception()
148   {
149     if (pending_exception_)
150     {
151       std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
152       std::rethrow_exception(ex);
153     }
154   }
155 
clear_cancellation_slot()156   void clear_cancellation_slot()
157   {
158     this->attached_thread_->entry_point()->cancellation_state_.slot().clear();
159   }
160 
161   template <typename T>
await_transform(awaitable<T,Executor> a) const162   auto await_transform(awaitable<T, Executor> a) const
163   {
164     if (attached_thread_->entry_point()->throw_if_cancelled_)
165       if (!!attached_thread_->get_cancellation_state().cancelled())
166         do_throw_error(boost::asio::error::operation_aborted, "co_await");
167     return a;
168   }
169 
170   // This await transformation obtains the associated executor of the thread of
171   // execution.
await_transform(this_coro::executor_t)172   auto await_transform(this_coro::executor_t) noexcept
173   {
174     struct result
175     {
176       awaitable_frame_base* this_;
177 
178       bool await_ready() const noexcept
179       {
180         return true;
181       }
182 
183       void await_suspend(coroutine_handle<void>) noexcept
184       {
185       }
186 
187       auto await_resume() const noexcept
188       {
189         return this_->attached_thread_->get_executor();
190       }
191     };
192 
193     return result{this};
194   }
195 
196   // This await transformation obtains the associated cancellation state of the
197   // thread of execution.
await_transform(this_coro::cancellation_state_t)198   auto await_transform(this_coro::cancellation_state_t) noexcept
199   {
200     struct result
201     {
202       awaitable_frame_base* this_;
203 
204       bool await_ready() const noexcept
205       {
206         return true;
207       }
208 
209       void await_suspend(coroutine_handle<void>) noexcept
210       {
211       }
212 
213       auto await_resume() const noexcept
214       {
215         return this_->attached_thread_->get_cancellation_state();
216       }
217     };
218 
219     return result{this};
220   }
221 
222   // This await transformation resets the associated cancellation state.
await_transform(this_coro::reset_cancellation_state_0_t)223   auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept
224   {
225     struct result
226     {
227       awaitable_frame_base* this_;
228 
229       bool await_ready() const noexcept
230       {
231         return true;
232       }
233 
234       void await_suspend(coroutine_handle<void>) noexcept
235       {
236       }
237 
238       auto await_resume() const
239       {
240         return this_->attached_thread_->reset_cancellation_state();
241       }
242     };
243 
244     return result{this};
245   }
246 
247   // This await transformation resets the associated cancellation state.
248   template <typename Filter>
await_transform(this_coro::reset_cancellation_state_1_t<Filter> reset)249   auto await_transform(
250       this_coro::reset_cancellation_state_1_t<Filter> reset) noexcept
251   {
252     struct result
253     {
254       awaitable_frame_base* this_;
255       Filter filter_;
256 
257       bool await_ready() const noexcept
258       {
259         return true;
260       }
261 
262       void await_suspend(coroutine_handle<void>) noexcept
263       {
264       }
265 
266       auto await_resume()
267       {
268         return this_->attached_thread_->reset_cancellation_state(
269             BOOST_ASIO_MOVE_CAST(Filter)(filter_));
270       }
271     };
272 
273     return result{this, BOOST_ASIO_MOVE_CAST(Filter)(reset.filter)};
274   }
275 
276   // This await transformation resets the associated cancellation state.
277   template <typename InFilter, typename OutFilter>
await_transform(this_coro::reset_cancellation_state_2_t<InFilter,OutFilter> reset)278   auto await_transform(
279       this_coro::reset_cancellation_state_2_t<InFilter, OutFilter> reset)
280     noexcept
281   {
282     struct result
283     {
284       awaitable_frame_base* this_;
285       InFilter in_filter_;
286       OutFilter out_filter_;
287 
288       bool await_ready() const noexcept
289       {
290         return true;
291       }
292 
293       void await_suspend(coroutine_handle<void>) noexcept
294       {
295       }
296 
297       auto await_resume()
298       {
299         return this_->attached_thread_->reset_cancellation_state(
300             BOOST_ASIO_MOVE_CAST(InFilter)(in_filter_),
301             BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter_));
302       }
303     };
304 
305     return result{this,
306         BOOST_ASIO_MOVE_CAST(InFilter)(reset.in_filter),
307         BOOST_ASIO_MOVE_CAST(OutFilter)(reset.out_filter)};
308   }
309 
310   // This await transformation determines whether cancellation is propagated as
311   // an exception.
await_transform(this_coro::throw_if_cancelled_0_t)312   auto await_transform(this_coro::throw_if_cancelled_0_t)
313     noexcept
314   {
315     struct result
316     {
317       awaitable_frame_base* this_;
318 
319       bool await_ready() const noexcept
320       {
321         return true;
322       }
323 
324       void await_suspend(coroutine_handle<void>) noexcept
325       {
326       }
327 
328       auto await_resume()
329       {
330         return this_->attached_thread_->throw_if_cancelled();
331       }
332     };
333 
334     return result{this};
335   }
336 
337   // This await transformation sets whether cancellation is propagated as an
338   // exception.
await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled)339   auto await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled)
340     noexcept
341   {
342     struct result
343     {
344       awaitable_frame_base* this_;
345       bool value_;
346 
347       bool await_ready() const noexcept
348       {
349         return true;
350       }
351 
352       void await_suspend(coroutine_handle<void>) noexcept
353       {
354       }
355 
356       auto await_resume()
357       {
358         this_->attached_thread_->throw_if_cancelled(value_);
359       }
360     };
361 
362     return result{this, throw_if_cancelled.value};
363   }
364 
365   // This await transformation is used to run an async operation's initiation
366   // function object after the coroutine has been suspended. This ensures that
367   // immediate resumption of the coroutine in another thread does not cause a
368   // race condition.
369   template <typename Function>
await_transform(Function f,typename enable_if<is_convertible<typename result_of<Function (awaitable_frame_base *)>::type,awaitable_thread<Executor> * >::value>::type * =0)370   auto await_transform(Function f,
371       typename enable_if<
372         is_convertible<
373           typename result_of<Function(awaitable_frame_base*)>::type,
374           awaitable_thread<Executor>*
375         >::value
376       >::type* = 0)
377   {
378     struct result
379     {
380       Function function_;
381       awaitable_frame_base* this_;
382 
383       bool await_ready() const noexcept
384       {
385         return false;
386       }
387 
388       void await_suspend(coroutine_handle<void>) noexcept
389       {
390         function_(this_);
391       }
392 
393       void await_resume() const noexcept
394       {
395       }
396     };
397 
398     return result{std::move(f), this};
399   }
400 
401   // Access the awaitable thread's has_context_switched_ flag.
await_transform(detail::awaitable_thread_has_context_switched)402   auto await_transform(detail::awaitable_thread_has_context_switched) noexcept
403   {
404     struct result
405     {
406       awaitable_frame_base* this_;
407 
408       bool await_ready() const noexcept
409       {
410         return true;
411       }
412 
413       void await_suspend(coroutine_handle<void>) noexcept
414       {
415       }
416 
417       bool& await_resume() const noexcept
418       {
419         return this_->attached_thread_->entry_point()->has_context_switched_;
420       }
421     };
422 
423     return result{this};
424   }
425 
attach_thread(awaitable_thread<Executor> * handler)426   void attach_thread(awaitable_thread<Executor>* handler) noexcept
427   {
428     attached_thread_ = handler;
429   }
430 
detach_thread()431   awaitable_thread<Executor>* detach_thread() noexcept
432   {
433     attached_thread_->entry_point()->has_context_switched_ = true;
434     return std::exchange(attached_thread_, nullptr);
435   }
436 
push_frame(awaitable_frame_base<Executor> * caller)437   void push_frame(awaitable_frame_base<Executor>* caller) noexcept
438   {
439     caller_ = caller;
440     attached_thread_ = caller_->attached_thread_;
441     attached_thread_->entry_point()->top_of_stack_ = this;
442     caller_->attached_thread_ = nullptr;
443   }
444 
pop_frame()445   void pop_frame() noexcept
446   {
447     if (caller_)
448       caller_->attached_thread_ = attached_thread_;
449     attached_thread_->entry_point()->top_of_stack_ = caller_;
450     attached_thread_ = nullptr;
451     caller_ = nullptr;
452   }
453 
resume()454   void resume()
455   {
456     coro_.resume();
457   }
458 
destroy()459   void destroy()
460   {
461     coro_.destroy();
462   }
463 
464 protected:
465   coroutine_handle<void> coro_ = nullptr;
466   awaitable_thread<Executor>* attached_thread_ = nullptr;
467   awaitable_frame_base<Executor>* caller_ = nullptr;
468   std::exception_ptr pending_exception_ = nullptr;
469 };
470 
471 template <typename T, typename Executor>
472 class awaitable_frame
473   : public awaitable_frame_base<Executor>
474 {
475 public:
awaitable_frame()476   awaitable_frame() noexcept
477   {
478   }
479 
awaitable_frame(awaitable_frame && other)480   awaitable_frame(awaitable_frame&& other) noexcept
481     : awaitable_frame_base<Executor>(std::move(other))
482   {
483   }
484 
~awaitable_frame()485   ~awaitable_frame()
486   {
487     if (has_result_)
488       static_cast<T*>(static_cast<void*>(result_))->~T();
489   }
490 
get_return_object()491   awaitable<T, Executor> get_return_object() noexcept
492   {
493     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
494     return awaitable<T, Executor>(this);
495   };
496 
497   template <typename U>
return_value(U && u)498   void return_value(U&& u)
499   {
500     new (&result_) T(std::forward<U>(u));
501     has_result_ = true;
502   }
503 
504   template <typename... Us>
return_values(Us &&...us)505   void return_values(Us&&... us)
506   {
507     this->return_value(std::forward_as_tuple(std::forward<Us>(us)...));
508   }
509 
get()510   T get()
511   {
512     this->caller_ = nullptr;
513     this->rethrow_exception();
514     return std::move(*static_cast<T*>(static_cast<void*>(result_)));
515   }
516 
517 private:
518   alignas(T) unsigned char result_[sizeof(T)];
519   bool has_result_ = false;
520 };
521 
522 template <typename Executor>
523 class awaitable_frame<void, Executor>
524   : public awaitable_frame_base<Executor>
525 {
526 public:
get_return_object()527   awaitable<void, Executor> get_return_object()
528   {
529     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
530     return awaitable<void, Executor>(this);
531   };
532 
return_void()533   void return_void()
534   {
535   }
536 
get()537   void get()
538   {
539     this->caller_ = nullptr;
540     this->rethrow_exception();
541   }
542 };
543 
544 struct awaitable_thread_entry_point {};
545 
546 template <typename Executor>
547 class awaitable_frame<awaitable_thread_entry_point, Executor>
548   : public awaitable_frame_base<Executor>
549 {
550 public:
awaitable_frame()551   awaitable_frame()
552     : top_of_stack_(0),
553       has_executor_(false),
554       has_context_switched_(false),
555       throw_if_cancelled_(true)
556   {
557   }
558 
~awaitable_frame()559   ~awaitable_frame()
560   {
561     if (has_executor_)
562       u_.executor_.~Executor();
563   }
564 
get_return_object()565   awaitable<awaitable_thread_entry_point, Executor> get_return_object()
566   {
567     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
568     return awaitable<awaitable_thread_entry_point, Executor>(this);
569   };
570 
return_void()571   void return_void()
572   {
573   }
574 
get()575   void get()
576   {
577     this->caller_ = nullptr;
578     this->rethrow_exception();
579   }
580 
581 private:
582   template <typename> friend class awaitable_frame_base;
583   template <typename, typename> friend class awaitable_handler_base;
584   template <typename> friend class awaitable_thread;
585 
586   union u
587   {
u()588     u() {}
~u()589     ~u() {}
590     char c_;
591     Executor executor_;
592   } u_;
593 
594   awaitable_frame_base<Executor>* top_of_stack_;
595   boost::asio::cancellation_slot parent_cancellation_slot_;
596   boost::asio::cancellation_state cancellation_state_;
597   bool has_executor_;
598   bool has_context_switched_;
599   bool throw_if_cancelled_;
600 };
601 
602 template <typename Executor>
603 class awaitable_thread
604 {
605 public:
606   typedef Executor executor_type;
607   typedef cancellation_slot cancellation_slot_type;
608 
609   // Construct from the entry point of a new thread of execution.
awaitable_thread(awaitable<awaitable_thread_entry_point,Executor> p,const Executor & ex,cancellation_slot parent_cancel_slot,cancellation_state cancel_state)610   awaitable_thread(awaitable<awaitable_thread_entry_point, Executor> p,
611       const Executor& ex, cancellation_slot parent_cancel_slot,
612       cancellation_state cancel_state)
613     : bottom_of_stack_(std::move(p))
614   {
615     bottom_of_stack_.frame_->top_of_stack_ = bottom_of_stack_.frame_;
616     new (&bottom_of_stack_.frame_->u_.executor_) Executor(ex);
617     bottom_of_stack_.frame_->has_executor_ = true;
618     bottom_of_stack_.frame_->parent_cancellation_slot_ = parent_cancel_slot;
619     bottom_of_stack_.frame_->cancellation_state_ = cancel_state;
620   }
621 
622   // Transfer ownership from another awaitable_thread.
awaitable_thread(awaitable_thread && other)623   awaitable_thread(awaitable_thread&& other) noexcept
624     : bottom_of_stack_(std::move(other.bottom_of_stack_))
625   {
626   }
627 
628   // Clean up with a last ditch effort to ensure the thread is unwound within
629   // the context of the executor.
~awaitable_thread()630   ~awaitable_thread()
631   {
632     if (bottom_of_stack_.valid())
633     {
634       // Coroutine "stack unwinding" must be performed through the executor.
635       auto* bottom_frame = bottom_of_stack_.frame_;
636       (post)(bottom_frame->u_.executor_,
637           [a = std::move(bottom_of_stack_)]() mutable
638           {
639             (void)awaitable<awaitable_thread_entry_point, Executor>(
640                 std::move(a));
641           });
642     }
643   }
644 
entry_point()645   awaitable_frame<awaitable_thread_entry_point, Executor>* entry_point()
646   {
647     return bottom_of_stack_.frame_;
648   }
649 
get_executor() const650   executor_type get_executor() const noexcept
651   {
652     return bottom_of_stack_.frame_->u_.executor_;
653   }
654 
get_cancellation_state() const655   cancellation_state get_cancellation_state() const noexcept
656   {
657     return bottom_of_stack_.frame_->cancellation_state_;
658   }
659 
reset_cancellation_state()660   void reset_cancellation_state()
661   {
662     bottom_of_stack_.frame_->cancellation_state_ =
663       cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_);
664   }
665 
666   template <typename Filter>
reset_cancellation_state(BOOST_ASIO_MOVE_ARG (Filter)filter)667   void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(Filter) filter)
668   {
669     bottom_of_stack_.frame_->cancellation_state_ =
670       cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_,
671         BOOST_ASIO_MOVE_CAST(Filter)(filter));
672   }
673 
674   template <typename InFilter, typename OutFilter>
reset_cancellation_state(BOOST_ASIO_MOVE_ARG (InFilter)in_filter,BOOST_ASIO_MOVE_ARG (OutFilter)out_filter)675   void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(InFilter) in_filter,
676       BOOST_ASIO_MOVE_ARG(OutFilter) out_filter)
677   {
678     bottom_of_stack_.frame_->cancellation_state_ =
679       cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_,
680         BOOST_ASIO_MOVE_CAST(InFilter)(in_filter),
681         BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter));
682   }
683 
throw_if_cancelled() const684   bool throw_if_cancelled() const
685   {
686     return bottom_of_stack_.frame_->throw_if_cancelled_;
687   }
688 
throw_if_cancelled(bool value)689   void throw_if_cancelled(bool value)
690   {
691     bottom_of_stack_.frame_->throw_if_cancelled_ = value;
692   }
693 
get_cancellation_slot() const694   cancellation_slot_type get_cancellation_slot() const noexcept
695   {
696     return bottom_of_stack_.frame_->cancellation_state_.slot();
697   }
698 
699   // Launch a new thread of execution.
launch()700   void launch()
701   {
702     bottom_of_stack_.frame_->top_of_stack_->attach_thread(this);
703     pump();
704   }
705 
706 protected:
707   template <typename> friend class awaitable_frame_base;
708 
709   // Repeatedly resume the top stack frame until the stack is empty or until it
710   // has been transferred to another resumable_thread object.
pump()711   void pump()
712   {
713     do
714       bottom_of_stack_.frame_->top_of_stack_->resume();
715     while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_);
716 
717     if (bottom_of_stack_.frame_)
718     {
719       awaitable<awaitable_thread_entry_point, Executor> a(
720           std::move(bottom_of_stack_));
721       a.frame_->rethrow_exception();
722     }
723   }
724 
725   awaitable<awaitable_thread_entry_point, Executor> bottom_of_stack_;
726 };
727 
728 } // namespace detail
729 } // namespace asio
730 } // namespace boost
731 
732 #if !defined(GENERATING_DOCUMENTATION)
733 # if defined(BOOST_ASIO_HAS_STD_COROUTINE)
734 
735 namespace std {
736 
737 template <typename T, typename Executor, typename... Args>
738 struct coroutine_traits<boost::asio::awaitable<T, Executor>, Args...>
739 {
740   typedef boost::asio::detail::awaitable_frame<T, Executor> promise_type;
741 };
742 
743 } // namespace std
744 
745 # else // defined(BOOST_ASIO_HAS_STD_COROUTINE)
746 
747 namespace std { namespace experimental {
748 
749 template <typename T, typename Executor, typename... Args>
750 struct coroutine_traits<boost::asio::awaitable<T, Executor>, Args...>
751 {
752   typedef boost::asio::detail::awaitable_frame<T, Executor> promise_type;
753 };
754 
755 }} // namespace std::experimental
756 
757 # endif // defined(BOOST_ASIO_HAS_STD_COROUTINE)
758 #endif // !defined(GENERATING_DOCUMENTATION)
759 
760 #include <boost/asio/detail/pop_options.hpp>
761 
762 #endif // BOOST_ASIO_IMPL_AWAITABLE_HPP
763