1 //
2 // impl/awaitable.hpp
3 // ~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #ifndef BOOST_ASIO_IMPL_AWAITABLE_HPP
12 #define BOOST_ASIO_IMPL_AWAITABLE_HPP
13 
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17 
18 #include <boost/asio/detail/config.hpp>
19 #include <exception>
20 #include <new>
21 #include <tuple>
22 #include <utility>
23 #include <boost/asio/detail/thread_context.hpp>
24 #include <boost/asio/detail/thread_info_base.hpp>
25 #include <boost/asio/detail/type_traits.hpp>
26 #include <boost/asio/post.hpp>
27 #include <boost/system/system_error.hpp>
28 #include <boost/asio/this_coro.hpp>
29 
30 #include <boost/asio/detail/push_options.hpp>
31 
32 namespace boost {
33 namespace asio {
34 namespace detail {
35 
36 // An awaitable_thread represents a thread-of-execution that is composed of one
37 // or more "stack frames", with each frame represented by an awaitable_frame.
38 // All execution occurs in the context of the awaitable_thread's executor. An
39 // awaitable_thread continues to "pump" the stack frames by repeatedly resuming
40 // the top stack frame until the stack is empty, or until ownership of the
41 // stack is transferred to another awaitable_thread object.
42 //
43 //                +------------------------------------+
44 //                | top_of_stack_                      |
45 //                |                                    V
46 // +--------------+---+                            +-----------------+
47 // |                  |                            |                 |
48 // | awaitable_thread |<---------------------------+ awaitable_frame |
49 // |                  |           attached_thread_ |                 |
50 // +--------------+---+           (Set only when   +---+-------------+
51 //                |               frames are being     |
52 //                |               actively pumped      | caller_
53 //                |               by a thread, and     |
54 //                |               then only for        V
55 //                |               the top frame.)  +-----------------+
56 //                |                                |                 |
57 //                |                                | awaitable_frame |
58 //                |                                |                 |
59 //                |                                +---+-------------+
60 //                |                                    |
61 //                |                                    | caller_
62 //                |                                    :
63 //                |                                    :
64 //                |                                    |
65 //                |                                    V
66 //                |                                +-----------------+
67 //                | bottom_of_stack_               |                 |
68 //                +------------------------------->| awaitable_frame |
69 //                                                 |                 |
70 //                                                 +-----------------+
71 
72 template <typename Executor>
73 class awaitable_frame_base
74 {
75 public:
76 #if !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
operator new(std::size_t size)77   void* operator new(std::size_t size)
78   {
79     return boost::asio::detail::thread_info_base::allocate(
80         boost::asio::detail::thread_info_base::awaitable_frame_tag(),
81         boost::asio::detail::thread_context::thread_call_stack::top(),
82         size);
83   }
84 
operator delete(void * pointer,std::size_t size)85   void operator delete(void* pointer, std::size_t size)
86   {
87     boost::asio::detail::thread_info_base::deallocate(
88         boost::asio::detail::thread_info_base::awaitable_frame_tag(),
89         boost::asio::detail::thread_context::thread_call_stack::top(),
90         pointer, size);
91   }
92 #endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
93 
94   // The frame starts in a suspended state until the awaitable_thread object
95   // pumps the stack.
initial_suspend()96   auto initial_suspend() noexcept
97   {
98     return suspend_always();
99   }
100 
101   // On final suspension the frame is popped from the top of the stack.
final_suspend()102   auto final_suspend() noexcept
103   {
104     struct result
105     {
106       awaitable_frame_base* this_;
107 
108       bool await_ready() const noexcept
109       {
110         return false;
111       }
112 
113       void await_suspend(coroutine_handle<void>) noexcept
114       {
115         this_->pop_frame();
116       }
117 
118       void await_resume() const noexcept
119       {
120       }
121     };
122 
123     return result{this};
124   }
125 
set_except(std::exception_ptr e)126   void set_except(std::exception_ptr e) noexcept
127   {
128     pending_exception_ = e;
129   }
130 
set_error(const boost::system::error_code & ec)131   void set_error(const boost::system::error_code& ec)
132   {
133     this->set_except(std::make_exception_ptr(boost::system::system_error(ec)));
134   }
135 
unhandled_exception()136   void unhandled_exception()
137   {
138     set_except(std::current_exception());
139   }
140 
rethrow_exception()141   void rethrow_exception()
142   {
143     if (pending_exception_)
144     {
145       std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
146       std::rethrow_exception(ex);
147     }
148   }
149 
150   template <typename T>
await_transform(awaitable<T,Executor> a) const151   auto await_transform(awaitable<T, Executor> a) const
152   {
153     return a;
154   }
155 
156   // This await transformation obtains the associated executor of the thread of
157   // execution.
await_transform(this_coro::executor_t)158   auto await_transform(this_coro::executor_t) noexcept
159   {
160     struct result
161     {
162       awaitable_frame_base* this_;
163 
164       bool await_ready() const noexcept
165       {
166         return true;
167       }
168 
169       void await_suspend(coroutine_handle<void>) noexcept
170       {
171       }
172 
173       auto await_resume() const noexcept
174       {
175         return this_->attached_thread_->get_executor();
176       }
177     };
178 
179     return result{this};
180   }
181 
182   // This await transformation is used to run an async operation's initiation
183   // function object after the coroutine has been suspended. This ensures that
184   // immediate resumption of the coroutine in another thread does not cause a
185   // race condition.
186   template <typename Function>
await_transform(Function f,typename enable_if<is_convertible<typename result_of<Function (awaitable_frame_base *)>::type,awaitable_thread<Executor> * >::value>::type * =0)187   auto await_transform(Function f,
188       typename enable_if<
189         is_convertible<
190           typename result_of<Function(awaitable_frame_base*)>::type,
191           awaitable_thread<Executor>*
192         >::value
193       >::type* = 0)
194   {
195     struct result
196     {
197       Function function_;
198       awaitable_frame_base* this_;
199 
200       bool await_ready() const noexcept
201       {
202         return false;
203       }
204 
205       void await_suspend(coroutine_handle<void>) noexcept
206       {
207         function_(this_);
208       }
209 
210       void await_resume() const noexcept
211       {
212       }
213     };
214 
215     return result{std::move(f), this};
216   }
217 
attach_thread(awaitable_thread<Executor> * handler)218   void attach_thread(awaitable_thread<Executor>* handler) noexcept
219   {
220     attached_thread_ = handler;
221   }
222 
detach_thread()223   awaitable_thread<Executor>* detach_thread() noexcept
224   {
225     return std::exchange(attached_thread_, nullptr);
226   }
227 
push_frame(awaitable_frame_base<Executor> * caller)228   void push_frame(awaitable_frame_base<Executor>* caller) noexcept
229   {
230     caller_ = caller;
231     attached_thread_ = caller_->attached_thread_;
232     attached_thread_->top_of_stack_ = this;
233     caller_->attached_thread_ = nullptr;
234   }
235 
pop_frame()236   void pop_frame() noexcept
237   {
238     if (caller_)
239       caller_->attached_thread_ = attached_thread_;
240     attached_thread_->top_of_stack_ = caller_;
241     attached_thread_ = nullptr;
242     caller_ = nullptr;
243   }
244 
resume()245   void resume()
246   {
247     coro_.resume();
248   }
249 
destroy()250   void destroy()
251   {
252     coro_.destroy();
253   }
254 
255 protected:
256   coroutine_handle<void> coro_ = nullptr;
257   awaitable_thread<Executor>* attached_thread_ = nullptr;
258   awaitable_frame_base<Executor>* caller_ = nullptr;
259   std::exception_ptr pending_exception_ = nullptr;
260 };
261 
262 template <typename T, typename Executor>
263 class awaitable_frame
264   : public awaitable_frame_base<Executor>
265 {
266 public:
awaitable_frame()267   awaitable_frame() noexcept
268   {
269   }
270 
awaitable_frame(awaitable_frame && other)271   awaitable_frame(awaitable_frame&& other) noexcept
272     : awaitable_frame_base<Executor>(std::move(other))
273   {
274   }
275 
~awaitable_frame()276   ~awaitable_frame()
277   {
278     if (has_result_)
279       static_cast<T*>(static_cast<void*>(result_))->~T();
280   }
281 
get_return_object()282   awaitable<T, Executor> get_return_object() noexcept
283   {
284     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
285     return awaitable<T, Executor>(this);
286   };
287 
288   template <typename U>
return_value(U && u)289   void return_value(U&& u)
290   {
291     new (&result_) T(std::forward<U>(u));
292     has_result_ = true;
293   }
294 
295   template <typename... Us>
return_values(Us &&...us)296   void return_values(Us&&... us)
297   {
298     this->return_value(std::forward_as_tuple(std::forward<Us>(us)...));
299   }
300 
get()301   T get()
302   {
303     this->caller_ = nullptr;
304     this->rethrow_exception();
305     return std::move(*static_cast<T*>(static_cast<void*>(result_)));
306   }
307 
308 private:
309   alignas(T) unsigned char result_[sizeof(T)];
310   bool has_result_ = false;
311 };
312 
313 template <typename Executor>
314 class awaitable_frame<void, Executor>
315   : public awaitable_frame_base<Executor>
316 {
317 public:
get_return_object()318   awaitable<void, Executor> get_return_object()
319   {
320     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
321     return awaitable<void, Executor>(this);
322   };
323 
return_void()324   void return_void()
325   {
326   }
327 
get()328   void get()
329   {
330     this->caller_ = nullptr;
331     this->rethrow_exception();
332   }
333 };
334 
335 template <typename Executor>
336 class awaitable_thread
337 {
338 public:
339   typedef Executor executor_type;
340 
341   // Construct from the entry point of a new thread of execution.
awaitable_thread(awaitable<void,Executor> p,const Executor & ex)342   awaitable_thread(awaitable<void, Executor> p, const Executor& ex)
343     : bottom_of_stack_(std::move(p)),
344       top_of_stack_(bottom_of_stack_.frame_),
345       executor_(ex)
346   {
347   }
348 
349   // Transfer ownership from another awaitable_thread.
awaitable_thread(awaitable_thread && other)350   awaitable_thread(awaitable_thread&& other) noexcept
351     : bottom_of_stack_(std::move(other.bottom_of_stack_)),
352       top_of_stack_(std::exchange(other.top_of_stack_, nullptr)),
353       executor_(std::move(other.executor_))
354   {
355   }
356 
357   // Clean up with a last ditch effort to ensure the thread is unwound within
358   // the context of the executor.
~awaitable_thread()359   ~awaitable_thread()
360   {
361     if (bottom_of_stack_.valid())
362     {
363       // Coroutine "stack unwinding" must be performed through the executor.
364       (post)(executor_,
365           [a = std::move(bottom_of_stack_)]() mutable
366           {
367             awaitable<void, Executor>(std::move(a));
368           });
369     }
370   }
371 
get_executor() const372   executor_type get_executor() const noexcept
373   {
374     return executor_;
375   }
376 
377   // Launch a new thread of execution.
launch()378   void launch()
379   {
380     top_of_stack_->attach_thread(this);
381     pump();
382   }
383 
384 protected:
385   template <typename> friend class awaitable_frame_base;
386 
387   // Repeatedly resume the top stack frame until the stack is empty or until it
388   // has been transferred to another resumable_thread object.
pump()389   void pump()
390   {
391     do top_of_stack_->resume(); while (top_of_stack_);
392     if (bottom_of_stack_.valid())
393     {
394       awaitable<void, Executor> a(std::move(bottom_of_stack_));
395       a.frame_->rethrow_exception();
396     }
397   }
398 
399   awaitable<void, Executor> bottom_of_stack_;
400   awaitable_frame_base<Executor>* top_of_stack_;
401   executor_type executor_;
402 };
403 
404 } // namespace detail
405 } // namespace asio
406 } // namespace boost
407 
408 #if !defined(GENERATING_DOCUMENTATION)
409 
410 namespace std { namespace experimental {
411 
412 template <typename T, typename Executor, typename... Args>
413 struct coroutine_traits<boost::asio::awaitable<T, Executor>, Args...>
414 {
415   typedef boost::asio::detail::awaitable_frame<T, Executor> promise_type;
416 };
417 
418 }} // namespace std::experimental
419 
420 #endif // !defined(GENERATING_DOCUMENTATION)
421 
422 #include <boost/asio/detail/pop_options.hpp>
423 
424 #endif // BOOST_ASIO_IMPL_AWAITABLE_HPP
425