1 //
2 // impl/awaitable.hpp
3 // ~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #ifndef ASIO_IMPL_AWAITABLE_HPP
12 #define ASIO_IMPL_AWAITABLE_HPP
13 
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17 
18 #include "asio/detail/config.hpp"
19 #include <exception>
20 #include <new>
21 #include <tuple>
22 #include <utility>
23 #include "asio/detail/thread_context.hpp"
24 #include "asio/detail/thread_info_base.hpp"
25 #include "asio/detail/type_traits.hpp"
26 #include "asio/post.hpp"
27 #include "asio/system_error.hpp"
28 #include "asio/this_coro.hpp"
29 
30 #include "asio/detail/push_options.hpp"
31 
32 namespace asio {
33 namespace detail {
34 
35 // An awaitable_thread represents a thread-of-execution that is composed of one
36 // or more "stack frames", with each frame represented by an awaitable_frame.
37 // All execution occurs in the context of the awaitable_thread's executor. An
38 // awaitable_thread continues to "pump" the stack frames by repeatedly resuming
39 // the top stack frame until the stack is empty, or until ownership of the
40 // stack is transferred to another awaitable_thread object.
41 //
42 //                +------------------------------------+
43 //                | top_of_stack_                      |
44 //                |                                    V
45 // +--------------+---+                            +-----------------+
46 // |                  |                            |                 |
47 // | awaitable_thread |<---------------------------+ awaitable_frame |
48 // |                  |           attached_thread_ |                 |
49 // +--------------+---+           (Set only when   +---+-------------+
50 //                |               frames are being     |
51 //                |               actively pumped      | caller_
52 //                |               by a thread, and     |
53 //                |               then only for        V
54 //                |               the top frame.)  +-----------------+
55 //                |                                |                 |
56 //                |                                | awaitable_frame |
57 //                |                                |                 |
58 //                |                                +---+-------------+
59 //                |                                    |
60 //                |                                    | caller_
61 //                |                                    :
62 //                |                                    :
63 //                |                                    |
64 //                |                                    V
65 //                |                                +-----------------+
66 //                | bottom_of_stack_               |                 |
67 //                +------------------------------->| awaitable_frame |
68 //                                                 |                 |
69 //                                                 +-----------------+
70 
71 template <typename Executor>
72 class awaitable_frame_base
73 {
74 public:
75 #if !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
operator new(std::size_t size)76   void* operator new(std::size_t size)
77   {
78     return asio::detail::thread_info_base::allocate(
79         asio::detail::thread_info_base::awaitable_frame_tag(),
80         asio::detail::thread_context::thread_call_stack::top(),
81         size);
82   }
83 
operator delete(void * pointer,std::size_t size)84   void operator delete(void* pointer, std::size_t size)
85   {
86     asio::detail::thread_info_base::deallocate(
87         asio::detail::thread_info_base::awaitable_frame_tag(),
88         asio::detail::thread_context::thread_call_stack::top(),
89         pointer, size);
90   }
91 #endif // !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
92 
93   // The frame starts in a suspended state until the awaitable_thread object
94   // pumps the stack.
initial_suspend()95   auto initial_suspend() noexcept
96   {
97     return suspend_always();
98   }
99 
100   // On final suspension the frame is popped from the top of the stack.
final_suspend()101   auto final_suspend() noexcept
102   {
103     struct result
104     {
105       awaitable_frame_base* this_;
106 
107       bool await_ready() const noexcept
108       {
109         return false;
110       }
111 
112       void await_suspend(coroutine_handle<void>) noexcept
113       {
114         this_->pop_frame();
115       }
116 
117       void await_resume() const noexcept
118       {
119       }
120     };
121 
122     return result{this};
123   }
124 
set_except(std::exception_ptr e)125   void set_except(std::exception_ptr e) noexcept
126   {
127     pending_exception_ = e;
128   }
129 
set_error(const asio::error_code & ec)130   void set_error(const asio::error_code& ec)
131   {
132     this->set_except(std::make_exception_ptr(asio::system_error(ec)));
133   }
134 
unhandled_exception()135   void unhandled_exception()
136   {
137     set_except(std::current_exception());
138   }
139 
rethrow_exception()140   void rethrow_exception()
141   {
142     if (pending_exception_)
143     {
144       std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
145       std::rethrow_exception(ex);
146     }
147   }
148 
149   template <typename T>
await_transform(awaitable<T,Executor> a) const150   auto await_transform(awaitable<T, Executor> a) const
151   {
152     return a;
153   }
154 
155   // This await transformation obtains the associated executor of the thread of
156   // execution.
await_transform(this_coro::executor_t)157   auto await_transform(this_coro::executor_t) noexcept
158   {
159     struct result
160     {
161       awaitable_frame_base* this_;
162 
163       bool await_ready() const noexcept
164       {
165         return true;
166       }
167 
168       void await_suspend(coroutine_handle<void>) noexcept
169       {
170       }
171 
172       auto await_resume() const noexcept
173       {
174         return this_->attached_thread_->get_executor();
175       }
176     };
177 
178     return result{this};
179   }
180 
181   // This await transformation is used to run an async operation's initiation
182   // function object after the coroutine has been suspended. This ensures that
183   // immediate resumption of the coroutine in another thread does not cause a
184   // race condition.
185   template <typename Function>
await_transform(Function f,typename enable_if<is_convertible<typename result_of<Function (awaitable_frame_base *)>::type,awaitable_thread<Executor> * >::value>::type * =0)186   auto await_transform(Function f,
187       typename enable_if<
188         is_convertible<
189           typename result_of<Function(awaitable_frame_base*)>::type,
190           awaitable_thread<Executor>*
191         >::value
192       >::type* = 0)
193   {
194     struct result
195     {
196       Function function_;
197       awaitable_frame_base* this_;
198 
199       bool await_ready() const noexcept
200       {
201         return false;
202       }
203 
204       void await_suspend(coroutine_handle<void>) noexcept
205       {
206         function_(this_);
207       }
208 
209       void await_resume() const noexcept
210       {
211       }
212     };
213 
214     return result{std::move(f), this};
215   }
216 
attach_thread(awaitable_thread<Executor> * handler)217   void attach_thread(awaitable_thread<Executor>* handler) noexcept
218   {
219     attached_thread_ = handler;
220   }
221 
detach_thread()222   awaitable_thread<Executor>* detach_thread() noexcept
223   {
224     return std::exchange(attached_thread_, nullptr);
225   }
226 
push_frame(awaitable_frame_base<Executor> * caller)227   void push_frame(awaitable_frame_base<Executor>* caller) noexcept
228   {
229     caller_ = caller;
230     attached_thread_ = caller_->attached_thread_;
231     attached_thread_->top_of_stack_ = this;
232     caller_->attached_thread_ = nullptr;
233   }
234 
pop_frame()235   void pop_frame() noexcept
236   {
237     if (caller_)
238       caller_->attached_thread_ = attached_thread_;
239     attached_thread_->top_of_stack_ = caller_;
240     attached_thread_ = nullptr;
241     caller_ = nullptr;
242   }
243 
resume()244   void resume()
245   {
246     coro_.resume();
247   }
248 
destroy()249   void destroy()
250   {
251     coro_.destroy();
252   }
253 
254 protected:
255   coroutine_handle<void> coro_ = nullptr;
256   awaitable_thread<Executor>* attached_thread_ = nullptr;
257   awaitable_frame_base<Executor>* caller_ = nullptr;
258   std::exception_ptr pending_exception_ = nullptr;
259 };
260 
261 template <typename T, typename Executor>
262 class awaitable_frame
263   : public awaitable_frame_base<Executor>
264 {
265 public:
awaitable_frame()266   awaitable_frame() noexcept
267   {
268   }
269 
awaitable_frame(awaitable_frame && other)270   awaitable_frame(awaitable_frame&& other) noexcept
271     : awaitable_frame_base<Executor>(std::move(other))
272   {
273   }
274 
~awaitable_frame()275   ~awaitable_frame()
276   {
277     if (has_result_)
278       static_cast<T*>(static_cast<void*>(result_))->~T();
279   }
280 
get_return_object()281   awaitable<T, Executor> get_return_object() noexcept
282   {
283     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
284     return awaitable<T, Executor>(this);
285   };
286 
287   template <typename U>
return_value(U && u)288   void return_value(U&& u)
289   {
290     new (&result_) T(std::forward<U>(u));
291     has_result_ = true;
292   }
293 
294   template <typename... Us>
return_values(Us &&...us)295   void return_values(Us&&... us)
296   {
297     this->return_value(std::forward_as_tuple(std::forward<Us>(us)...));
298   }
299 
get()300   T get()
301   {
302     this->caller_ = nullptr;
303     this->rethrow_exception();
304     return std::move(*static_cast<T*>(static_cast<void*>(result_)));
305   }
306 
307 private:
308   alignas(T) unsigned char result_[sizeof(T)];
309   bool has_result_ = false;
310 };
311 
312 template <typename Executor>
313 class awaitable_frame<void, Executor>
314   : public awaitable_frame_base<Executor>
315 {
316 public:
get_return_object()317   awaitable<void, Executor> get_return_object()
318   {
319     this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
320     return awaitable<void, Executor>(this);
321   };
322 
return_void()323   void return_void()
324   {
325   }
326 
get()327   void get()
328   {
329     this->caller_ = nullptr;
330     this->rethrow_exception();
331   }
332 };
333 
334 template <typename Executor>
335 class awaitable_thread
336 {
337 public:
338   typedef Executor executor_type;
339 
340   // Construct from the entry point of a new thread of execution.
awaitable_thread(awaitable<void,Executor> p,const Executor & ex)341   awaitable_thread(awaitable<void, Executor> p, const Executor& ex)
342     : bottom_of_stack_(std::move(p)),
343       top_of_stack_(bottom_of_stack_.frame_),
344       executor_(ex)
345   {
346   }
347 
348   // Transfer ownership from another awaitable_thread.
awaitable_thread(awaitable_thread && other)349   awaitable_thread(awaitable_thread&& other) noexcept
350     : bottom_of_stack_(std::move(other.bottom_of_stack_)),
351       top_of_stack_(std::exchange(other.top_of_stack_, nullptr)),
352       executor_(std::move(other.executor_))
353   {
354   }
355 
356   // Clean up with a last ditch effort to ensure the thread is unwound within
357   // the context of the executor.
~awaitable_thread()358   ~awaitable_thread()
359   {
360     if (bottom_of_stack_.valid())
361     {
362       // Coroutine "stack unwinding" must be performed through the executor.
363       (post)(executor_,
364           [a = std::move(bottom_of_stack_)]() mutable
365           {
366             awaitable<void, Executor>(std::move(a));
367           });
368     }
369   }
370 
get_executor() const371   executor_type get_executor() const noexcept
372   {
373     return executor_;
374   }
375 
376   // Launch a new thread of execution.
launch()377   void launch()
378   {
379     top_of_stack_->attach_thread(this);
380     pump();
381   }
382 
383 protected:
384   template <typename> friend class awaitable_frame_base;
385 
386   // Repeatedly resume the top stack frame until the stack is empty or until it
387   // has been transferred to another resumable_thread object.
pump()388   void pump()
389   {
390     do top_of_stack_->resume(); while (top_of_stack_);
391     if (bottom_of_stack_.valid())
392     {
393       awaitable<void, Executor> a(std::move(bottom_of_stack_));
394       a.frame_->rethrow_exception();
395     }
396   }
397 
398   awaitable<void, Executor> bottom_of_stack_;
399   awaitable_frame_base<Executor>* top_of_stack_;
400   executor_type executor_;
401 };
402 
403 } // namespace detail
404 } // namespace asio
405 
406 #if !defined(GENERATING_DOCUMENTATION)
407 
408 namespace std { namespace experimental {
409 
410 template <typename T, typename Executor, typename... Args>
411 struct coroutine_traits<asio::awaitable<T, Executor>, Args...>
412 {
413   typedef asio::detail::awaitable_frame<T, Executor> promise_type;
414 };
415 
416 }} // namespace std::experimental
417 
418 #endif // !defined(GENERATING_DOCUMENTATION)
419 
420 #include "asio/detail/pop_options.hpp"
421 
422 #endif // ASIO_IMPL_AWAITABLE_HPP
423