1 // 2 // impl/awaitable.hpp 3 // ~~~~~~~~~~~~~~~~~~ 4 // 5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 11 #ifndef BOOST_ASIO_IMPL_AWAITABLE_HPP 12 #define BOOST_ASIO_IMPL_AWAITABLE_HPP 13 14 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 15 # pragma once 16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 17 18 #include <boost/asio/detail/config.hpp> 19 #include <exception> 20 #include <new> 21 #include <tuple> 22 #include <utility> 23 #include <boost/asio/cancellation_signal.hpp> 24 #include <boost/asio/cancellation_state.hpp> 25 #include <boost/asio/detail/thread_context.hpp> 26 #include <boost/asio/detail/thread_info_base.hpp> 27 #include <boost/asio/detail/throw_error.hpp> 28 #include <boost/asio/detail/type_traits.hpp> 29 #include <boost/asio/error.hpp> 30 #include <boost/asio/post.hpp> 31 #include <boost/system/system_error.hpp> 32 #include <boost/asio/this_coro.hpp> 33 34 #include <boost/asio/detail/push_options.hpp> 35 36 namespace boost { 37 namespace asio { 38 namespace detail { 39 40 struct awaitable_thread_has_context_switched {}; 41 42 // An awaitable_thread represents a thread-of-execution that is composed of one 43 // or more "stack frames", with each frame represented by an awaitable_frame. 44 // All execution occurs in the context of the awaitable_thread's executor. An 45 // awaitable_thread continues to "pump" the stack frames by repeatedly resuming 46 // the top stack frame until the stack is empty, or until ownership of the 47 // stack is transferred to another awaitable_thread object. 48 // 49 // +------------------------------------+ 50 // | top_of_stack_ | 51 // | V 52 // +--------------+---+ +-----------------+ 53 // | | | | 54 // | awaitable_thread |<---------------------------+ awaitable_frame | 55 // | | attached_thread_ | | 56 // +--------------+---+ (Set only when +---+-------------+ 57 // | frames are being | 58 // | actively pumped | caller_ 59 // | by a thread, and | 60 // | then only for V 61 // | the top frame.) +-----------------+ 62 // | | | 63 // | | awaitable_frame | 64 // | | | 65 // | +---+-------------+ 66 // | | 67 // | | caller_ 68 // | : 69 // | : 70 // | | 71 // | V 72 // | +-----------------+ 73 // | bottom_of_stack_ | | 74 // +------------------------------->| awaitable_frame | 75 // | | 76 // +-----------------+ 77 78 template <typename Executor> 79 class awaitable_frame_base 80 { 81 public: 82 #if !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING) operator new(std::size_t size)83 void* operator new(std::size_t size) 84 { 85 return boost::asio::detail::thread_info_base::allocate( 86 boost::asio::detail::thread_info_base::awaitable_frame_tag(), 87 boost::asio::detail::thread_context::top_of_thread_call_stack(), 88 size); 89 } 90 operator delete(void * pointer,std::size_t size)91 void operator delete(void* pointer, std::size_t size) 92 { 93 boost::asio::detail::thread_info_base::deallocate( 94 boost::asio::detail::thread_info_base::awaitable_frame_tag(), 95 boost::asio::detail::thread_context::top_of_thread_call_stack(), 96 pointer, size); 97 } 98 #endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING) 99 100 // The frame starts in a suspended state until the awaitable_thread object 101 // pumps the stack. initial_suspend()102 auto initial_suspend() noexcept 103 { 104 return suspend_always(); 105 } 106 107 // On final suspension the frame is popped from the top of the stack. final_suspend()108 auto final_suspend() noexcept 109 { 110 struct result 111 { 112 awaitable_frame_base* this_; 113 114 bool await_ready() const noexcept 115 { 116 return false; 117 } 118 119 void await_suspend(coroutine_handle<void>) noexcept 120 { 121 this->this_->pop_frame(); 122 } 123 124 void await_resume() const noexcept 125 { 126 } 127 }; 128 129 return result{this}; 130 } 131 set_except(std::exception_ptr e)132 void set_except(std::exception_ptr e) noexcept 133 { 134 pending_exception_ = e; 135 } 136 set_error(const boost::system::error_code & ec)137 void set_error(const boost::system::error_code& ec) 138 { 139 this->set_except(std::make_exception_ptr(boost::system::system_error(ec))); 140 } 141 unhandled_exception()142 void unhandled_exception() 143 { 144 set_except(std::current_exception()); 145 } 146 rethrow_exception()147 void rethrow_exception() 148 { 149 if (pending_exception_) 150 { 151 std::exception_ptr ex = std::exchange(pending_exception_, nullptr); 152 std::rethrow_exception(ex); 153 } 154 } 155 clear_cancellation_slot()156 void clear_cancellation_slot() 157 { 158 this->attached_thread_->entry_point()->cancellation_state_.slot().clear(); 159 } 160 161 template <typename T> await_transform(awaitable<T,Executor> a) const162 auto await_transform(awaitable<T, Executor> a) const 163 { 164 if (attached_thread_->entry_point()->throw_if_cancelled_) 165 if (!!attached_thread_->get_cancellation_state().cancelled()) 166 do_throw_error(boost::asio::error::operation_aborted, "co_await"); 167 return a; 168 } 169 170 // This await transformation obtains the associated executor of the thread of 171 // execution. await_transform(this_coro::executor_t)172 auto await_transform(this_coro::executor_t) noexcept 173 { 174 struct result 175 { 176 awaitable_frame_base* this_; 177 178 bool await_ready() const noexcept 179 { 180 return true; 181 } 182 183 void await_suspend(coroutine_handle<void>) noexcept 184 { 185 } 186 187 auto await_resume() const noexcept 188 { 189 return this_->attached_thread_->get_executor(); 190 } 191 }; 192 193 return result{this}; 194 } 195 196 // This await transformation obtains the associated cancellation state of the 197 // thread of execution. await_transform(this_coro::cancellation_state_t)198 auto await_transform(this_coro::cancellation_state_t) noexcept 199 { 200 struct result 201 { 202 awaitable_frame_base* this_; 203 204 bool await_ready() const noexcept 205 { 206 return true; 207 } 208 209 void await_suspend(coroutine_handle<void>) noexcept 210 { 211 } 212 213 auto await_resume() const noexcept 214 { 215 return this_->attached_thread_->get_cancellation_state(); 216 } 217 }; 218 219 return result{this}; 220 } 221 222 // This await transformation resets the associated cancellation state. await_transform(this_coro::reset_cancellation_state_0_t)223 auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept 224 { 225 struct result 226 { 227 awaitable_frame_base* this_; 228 229 bool await_ready() const noexcept 230 { 231 return true; 232 } 233 234 void await_suspend(coroutine_handle<void>) noexcept 235 { 236 } 237 238 auto await_resume() const 239 { 240 return this_->attached_thread_->reset_cancellation_state(); 241 } 242 }; 243 244 return result{this}; 245 } 246 247 // This await transformation resets the associated cancellation state. 248 template <typename Filter> await_transform(this_coro::reset_cancellation_state_1_t<Filter> reset)249 auto await_transform( 250 this_coro::reset_cancellation_state_1_t<Filter> reset) noexcept 251 { 252 struct result 253 { 254 awaitable_frame_base* this_; 255 Filter filter_; 256 257 bool await_ready() const noexcept 258 { 259 return true; 260 } 261 262 void await_suspend(coroutine_handle<void>) noexcept 263 { 264 } 265 266 auto await_resume() 267 { 268 return this_->attached_thread_->reset_cancellation_state( 269 BOOST_ASIO_MOVE_CAST(Filter)(filter_)); 270 } 271 }; 272 273 return result{this, BOOST_ASIO_MOVE_CAST(Filter)(reset.filter)}; 274 } 275 276 // This await transformation resets the associated cancellation state. 277 template <typename InFilter, typename OutFilter> await_transform(this_coro::reset_cancellation_state_2_t<InFilter,OutFilter> reset)278 auto await_transform( 279 this_coro::reset_cancellation_state_2_t<InFilter, OutFilter> reset) 280 noexcept 281 { 282 struct result 283 { 284 awaitable_frame_base* this_; 285 InFilter in_filter_; 286 OutFilter out_filter_; 287 288 bool await_ready() const noexcept 289 { 290 return true; 291 } 292 293 void await_suspend(coroutine_handle<void>) noexcept 294 { 295 } 296 297 auto await_resume() 298 { 299 return this_->attached_thread_->reset_cancellation_state( 300 BOOST_ASIO_MOVE_CAST(InFilter)(in_filter_), 301 BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter_)); 302 } 303 }; 304 305 return result{this, 306 BOOST_ASIO_MOVE_CAST(InFilter)(reset.in_filter), 307 BOOST_ASIO_MOVE_CAST(OutFilter)(reset.out_filter)}; 308 } 309 310 // This await transformation determines whether cancellation is propagated as 311 // an exception. await_transform(this_coro::throw_if_cancelled_0_t)312 auto await_transform(this_coro::throw_if_cancelled_0_t) 313 noexcept 314 { 315 struct result 316 { 317 awaitable_frame_base* this_; 318 319 bool await_ready() const noexcept 320 { 321 return true; 322 } 323 324 void await_suspend(coroutine_handle<void>) noexcept 325 { 326 } 327 328 auto await_resume() 329 { 330 return this_->attached_thread_->throw_if_cancelled(); 331 } 332 }; 333 334 return result{this}; 335 } 336 337 // This await transformation sets whether cancellation is propagated as an 338 // exception. await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled)339 auto await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled) 340 noexcept 341 { 342 struct result 343 { 344 awaitable_frame_base* this_; 345 bool value_; 346 347 bool await_ready() const noexcept 348 { 349 return true; 350 } 351 352 void await_suspend(coroutine_handle<void>) noexcept 353 { 354 } 355 356 auto await_resume() 357 { 358 this_->attached_thread_->throw_if_cancelled(value_); 359 } 360 }; 361 362 return result{this, throw_if_cancelled.value}; 363 } 364 365 // This await transformation is used to run an async operation's initiation 366 // function object after the coroutine has been suspended. This ensures that 367 // immediate resumption of the coroutine in another thread does not cause a 368 // race condition. 369 template <typename Function> await_transform(Function f,typename enable_if<is_convertible<typename result_of<Function (awaitable_frame_base *)>::type,awaitable_thread<Executor> * >::value>::type * =0)370 auto await_transform(Function f, 371 typename enable_if< 372 is_convertible< 373 typename result_of<Function(awaitable_frame_base*)>::type, 374 awaitable_thread<Executor>* 375 >::value 376 >::type* = 0) 377 { 378 struct result 379 { 380 Function function_; 381 awaitable_frame_base* this_; 382 383 bool await_ready() const noexcept 384 { 385 return false; 386 } 387 388 void await_suspend(coroutine_handle<void>) noexcept 389 { 390 function_(this_); 391 } 392 393 void await_resume() const noexcept 394 { 395 } 396 }; 397 398 return result{std::move(f), this}; 399 } 400 401 // Access the awaitable thread's has_context_switched_ flag. await_transform(detail::awaitable_thread_has_context_switched)402 auto await_transform(detail::awaitable_thread_has_context_switched) noexcept 403 { 404 struct result 405 { 406 awaitable_frame_base* this_; 407 408 bool await_ready() const noexcept 409 { 410 return true; 411 } 412 413 void await_suspend(coroutine_handle<void>) noexcept 414 { 415 } 416 417 bool& await_resume() const noexcept 418 { 419 return this_->attached_thread_->entry_point()->has_context_switched_; 420 } 421 }; 422 423 return result{this}; 424 } 425 attach_thread(awaitable_thread<Executor> * handler)426 void attach_thread(awaitable_thread<Executor>* handler) noexcept 427 { 428 attached_thread_ = handler; 429 } 430 detach_thread()431 awaitable_thread<Executor>* detach_thread() noexcept 432 { 433 attached_thread_->entry_point()->has_context_switched_ = true; 434 return std::exchange(attached_thread_, nullptr); 435 } 436 push_frame(awaitable_frame_base<Executor> * caller)437 void push_frame(awaitable_frame_base<Executor>* caller) noexcept 438 { 439 caller_ = caller; 440 attached_thread_ = caller_->attached_thread_; 441 attached_thread_->entry_point()->top_of_stack_ = this; 442 caller_->attached_thread_ = nullptr; 443 } 444 pop_frame()445 void pop_frame() noexcept 446 { 447 if (caller_) 448 caller_->attached_thread_ = attached_thread_; 449 attached_thread_->entry_point()->top_of_stack_ = caller_; 450 attached_thread_ = nullptr; 451 caller_ = nullptr; 452 } 453 resume()454 void resume() 455 { 456 coro_.resume(); 457 } 458 destroy()459 void destroy() 460 { 461 coro_.destroy(); 462 } 463 464 protected: 465 coroutine_handle<void> coro_ = nullptr; 466 awaitable_thread<Executor>* attached_thread_ = nullptr; 467 awaitable_frame_base<Executor>* caller_ = nullptr; 468 std::exception_ptr pending_exception_ = nullptr; 469 }; 470 471 template <typename T, typename Executor> 472 class awaitable_frame 473 : public awaitable_frame_base<Executor> 474 { 475 public: awaitable_frame()476 awaitable_frame() noexcept 477 { 478 } 479 awaitable_frame(awaitable_frame && other)480 awaitable_frame(awaitable_frame&& other) noexcept 481 : awaitable_frame_base<Executor>(std::move(other)) 482 { 483 } 484 ~awaitable_frame()485 ~awaitable_frame() 486 { 487 if (has_result_) 488 static_cast<T*>(static_cast<void*>(result_))->~T(); 489 } 490 get_return_object()491 awaitable<T, Executor> get_return_object() noexcept 492 { 493 this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this); 494 return awaitable<T, Executor>(this); 495 }; 496 497 template <typename U> return_value(U && u)498 void return_value(U&& u) 499 { 500 new (&result_) T(std::forward<U>(u)); 501 has_result_ = true; 502 } 503 504 template <typename... Us> return_values(Us &&...us)505 void return_values(Us&&... us) 506 { 507 this->return_value(std::forward_as_tuple(std::forward<Us>(us)...)); 508 } 509 get()510 T get() 511 { 512 this->caller_ = nullptr; 513 this->rethrow_exception(); 514 return std::move(*static_cast<T*>(static_cast<void*>(result_))); 515 } 516 517 private: 518 alignas(T) unsigned char result_[sizeof(T)]; 519 bool has_result_ = false; 520 }; 521 522 template <typename Executor> 523 class awaitable_frame<void, Executor> 524 : public awaitable_frame_base<Executor> 525 { 526 public: get_return_object()527 awaitable<void, Executor> get_return_object() 528 { 529 this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this); 530 return awaitable<void, Executor>(this); 531 }; 532 return_void()533 void return_void() 534 { 535 } 536 get()537 void get() 538 { 539 this->caller_ = nullptr; 540 this->rethrow_exception(); 541 } 542 }; 543 544 struct awaitable_thread_entry_point {}; 545 546 template <typename Executor> 547 class awaitable_frame<awaitable_thread_entry_point, Executor> 548 : public awaitable_frame_base<Executor> 549 { 550 public: awaitable_frame()551 awaitable_frame() 552 : top_of_stack_(0), 553 has_executor_(false), 554 has_context_switched_(false), 555 throw_if_cancelled_(true) 556 { 557 } 558 ~awaitable_frame()559 ~awaitable_frame() 560 { 561 if (has_executor_) 562 u_.executor_.~Executor(); 563 } 564 get_return_object()565 awaitable<awaitable_thread_entry_point, Executor> get_return_object() 566 { 567 this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this); 568 return awaitable<awaitable_thread_entry_point, Executor>(this); 569 }; 570 return_void()571 void return_void() 572 { 573 } 574 get()575 void get() 576 { 577 this->caller_ = nullptr; 578 this->rethrow_exception(); 579 } 580 581 private: 582 template <typename> friend class awaitable_frame_base; 583 template <typename, typename> friend class awaitable_handler_base; 584 template <typename> friend class awaitable_thread; 585 586 union u 587 { u()588 u() {} ~u()589 ~u() {} 590 char c_; 591 Executor executor_; 592 } u_; 593 594 awaitable_frame_base<Executor>* top_of_stack_; 595 boost::asio::cancellation_slot parent_cancellation_slot_; 596 boost::asio::cancellation_state cancellation_state_; 597 bool has_executor_; 598 bool has_context_switched_; 599 bool throw_if_cancelled_; 600 }; 601 602 template <typename Executor> 603 class awaitable_thread 604 { 605 public: 606 typedef Executor executor_type; 607 typedef cancellation_slot cancellation_slot_type; 608 609 // Construct from the entry point of a new thread of execution. awaitable_thread(awaitable<awaitable_thread_entry_point,Executor> p,const Executor & ex,cancellation_slot parent_cancel_slot,cancellation_state cancel_state)610 awaitable_thread(awaitable<awaitable_thread_entry_point, Executor> p, 611 const Executor& ex, cancellation_slot parent_cancel_slot, 612 cancellation_state cancel_state) 613 : bottom_of_stack_(std::move(p)) 614 { 615 bottom_of_stack_.frame_->top_of_stack_ = bottom_of_stack_.frame_; 616 new (&bottom_of_stack_.frame_->u_.executor_) Executor(ex); 617 bottom_of_stack_.frame_->has_executor_ = true; 618 bottom_of_stack_.frame_->parent_cancellation_slot_ = parent_cancel_slot; 619 bottom_of_stack_.frame_->cancellation_state_ = cancel_state; 620 } 621 622 // Transfer ownership from another awaitable_thread. awaitable_thread(awaitable_thread && other)623 awaitable_thread(awaitable_thread&& other) noexcept 624 : bottom_of_stack_(std::move(other.bottom_of_stack_)) 625 { 626 } 627 628 // Clean up with a last ditch effort to ensure the thread is unwound within 629 // the context of the executor. ~awaitable_thread()630 ~awaitable_thread() 631 { 632 if (bottom_of_stack_.valid()) 633 { 634 // Coroutine "stack unwinding" must be performed through the executor. 635 auto* bottom_frame = bottom_of_stack_.frame_; 636 (post)(bottom_frame->u_.executor_, 637 [a = std::move(bottom_of_stack_)]() mutable 638 { 639 (void)awaitable<awaitable_thread_entry_point, Executor>( 640 std::move(a)); 641 }); 642 } 643 } 644 entry_point()645 awaitable_frame<awaitable_thread_entry_point, Executor>* entry_point() 646 { 647 return bottom_of_stack_.frame_; 648 } 649 get_executor() const650 executor_type get_executor() const noexcept 651 { 652 return bottom_of_stack_.frame_->u_.executor_; 653 } 654 get_cancellation_state() const655 cancellation_state get_cancellation_state() const noexcept 656 { 657 return bottom_of_stack_.frame_->cancellation_state_; 658 } 659 reset_cancellation_state()660 void reset_cancellation_state() 661 { 662 bottom_of_stack_.frame_->cancellation_state_ = 663 cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_); 664 } 665 666 template <typename Filter> reset_cancellation_state(BOOST_ASIO_MOVE_ARG (Filter)filter)667 void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(Filter) filter) 668 { 669 bottom_of_stack_.frame_->cancellation_state_ = 670 cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_, 671 BOOST_ASIO_MOVE_CAST(Filter)(filter)); 672 } 673 674 template <typename InFilter, typename OutFilter> reset_cancellation_state(BOOST_ASIO_MOVE_ARG (InFilter)in_filter,BOOST_ASIO_MOVE_ARG (OutFilter)out_filter)675 void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(InFilter) in_filter, 676 BOOST_ASIO_MOVE_ARG(OutFilter) out_filter) 677 { 678 bottom_of_stack_.frame_->cancellation_state_ = 679 cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_, 680 BOOST_ASIO_MOVE_CAST(InFilter)(in_filter), 681 BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter)); 682 } 683 throw_if_cancelled() const684 bool throw_if_cancelled() const 685 { 686 return bottom_of_stack_.frame_->throw_if_cancelled_; 687 } 688 throw_if_cancelled(bool value)689 void throw_if_cancelled(bool value) 690 { 691 bottom_of_stack_.frame_->throw_if_cancelled_ = value; 692 } 693 get_cancellation_slot() const694 cancellation_slot_type get_cancellation_slot() const noexcept 695 { 696 return bottom_of_stack_.frame_->cancellation_state_.slot(); 697 } 698 699 // Launch a new thread of execution. launch()700 void launch() 701 { 702 bottom_of_stack_.frame_->top_of_stack_->attach_thread(this); 703 pump(); 704 } 705 706 protected: 707 template <typename> friend class awaitable_frame_base; 708 709 // Repeatedly resume the top stack frame until the stack is empty or until it 710 // has been transferred to another resumable_thread object. pump()711 void pump() 712 { 713 do 714 bottom_of_stack_.frame_->top_of_stack_->resume(); 715 while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_); 716 717 if (bottom_of_stack_.frame_) 718 { 719 awaitable<awaitable_thread_entry_point, Executor> a( 720 std::move(bottom_of_stack_)); 721 a.frame_->rethrow_exception(); 722 } 723 } 724 725 awaitable<awaitable_thread_entry_point, Executor> bottom_of_stack_; 726 }; 727 728 } // namespace detail 729 } // namespace asio 730 } // namespace boost 731 732 #if !defined(GENERATING_DOCUMENTATION) 733 # if defined(BOOST_ASIO_HAS_STD_COROUTINE) 734 735 namespace std { 736 737 template <typename T, typename Executor, typename... Args> 738 struct coroutine_traits<boost::asio::awaitable<T, Executor>, Args...> 739 { 740 typedef boost::asio::detail::awaitable_frame<T, Executor> promise_type; 741 }; 742 743 } // namespace std 744 745 # else // defined(BOOST_ASIO_HAS_STD_COROUTINE) 746 747 namespace std { namespace experimental { 748 749 template <typename T, typename Executor, typename... Args> 750 struct coroutine_traits<boost::asio::awaitable<T, Executor>, Args...> 751 { 752 typedef boost::asio::detail::awaitable_frame<T, Executor> promise_type; 753 }; 754 755 }} // namespace std::experimental 756 757 # endif // defined(BOOST_ASIO_HAS_STD_COROUTINE) 758 #endif // !defined(GENERATING_DOCUMENTATION) 759 760 #include <boost/asio/detail/pop_options.hpp> 761 762 #endif // BOOST_ASIO_IMPL_AWAITABLE_HPP 763