1// 2// detail/impl/win_iocp_io_context.ipp 3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4// 5// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6// 7// Distributed under the Boost Software License, Version 1.0. (See accompanying 8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9// 10 11#ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP 12#define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP 13 14#if defined(_MSC_VER) && (_MSC_VER >= 1200) 15# pragma once 16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 17 18#include <boost/asio/detail/config.hpp> 19 20#if defined(BOOST_ASIO_HAS_IOCP) 21 22#include <boost/asio/error.hpp> 23#include <boost/asio/detail/cstdint.hpp> 24#include <boost/asio/detail/handler_alloc_helpers.hpp> 25#include <boost/asio/detail/handler_invoke_helpers.hpp> 26#include <boost/asio/detail/limits.hpp> 27#include <boost/asio/detail/thread.hpp> 28#include <boost/asio/detail/throw_error.hpp> 29#include <boost/asio/detail/win_iocp_io_context.hpp> 30 31#include <boost/asio/detail/push_options.hpp> 32 33namespace boost { 34namespace asio { 35namespace detail { 36 37struct win_iocp_io_context::thread_function 38{ 39 explicit thread_function(win_iocp_io_context* s) 40 : this_(s) 41 { 42 } 43 44 void operator()() 45 { 46 boost::system::error_code ec; 47 this_->run(ec); 48 } 49 50 win_iocp_io_context* this_; 51}; 52 53struct win_iocp_io_context::work_finished_on_block_exit 54{ 55 ~work_finished_on_block_exit() 56 { 57 io_context_->work_finished(); 58 } 59 60 win_iocp_io_context* io_context_; 61}; 62 63struct win_iocp_io_context::timer_thread_function 64{ 65 void operator()() 66 { 67 while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0) 68 { 69 if (::WaitForSingleObject(io_context_->waitable_timer_.handle, 70 INFINITE) == WAIT_OBJECT_0) 71 { 72 ::InterlockedExchange(&io_context_->dispatch_required_, 1); 73 ::PostQueuedCompletionStatus(io_context_->iocp_.handle, 74 0, wake_for_dispatch, 0); 75 } 76 } 77 } 78 79 win_iocp_io_context* io_context_; 80}; 81 82win_iocp_io_context::win_iocp_io_context( 83 boost::asio::execution_context& ctx, int concurrency_hint, bool own_thread) 84 : execution_context_service_base<win_iocp_io_context>(ctx), 85 iocp_(), 86 outstanding_work_(0), 87 stopped_(0), 88 stop_event_posted_(0), 89 shutdown_(0), 90 gqcs_timeout_(get_gqcs_timeout()), 91 dispatch_required_(0), 92 concurrency_hint_(concurrency_hint) 93{ 94 BOOST_ASIO_HANDLER_TRACKING_INIT; 95 96 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 97 static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0))); 98 if (!iocp_.handle) 99 { 100 DWORD last_error = ::GetLastError(); 101 boost::system::error_code ec(last_error, 102 boost::asio::error::get_system_category()); 103 boost::asio::detail::throw_error(ec, "iocp"); 104 } 105 106 if (own_thread) 107 { 108 ::InterlockedIncrement(&outstanding_work_); 109 thread_.reset(new boost::asio::detail::thread(thread_function(this))); 110 } 111} 112 113win_iocp_io_context::~win_iocp_io_context() 114{ 115 if (thread_.get()) 116 { 117 stop(); 118 thread_->join(); 119 thread_.reset(); 120 } 121} 122 123void win_iocp_io_context::shutdown() 124{ 125 ::InterlockedExchange(&shutdown_, 1); 126 127 if (timer_thread_.get()) 128 { 129 LARGE_INTEGER timeout; 130 timeout.QuadPart = 1; 131 ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE); 132 } 133 134 if (thread_.get()) 135 { 136 stop(); 137 thread_->join(); 138 thread_.reset(); 139 ::InterlockedDecrement(&outstanding_work_); 140 } 141 142 while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0) 143 { 144 op_queue<win_iocp_operation> ops; 145 timer_queues_.get_all_timers(ops); 146 ops.push(completed_ops_); 147 if (!ops.empty()) 148 { 149 while (win_iocp_operation* op = ops.front()) 150 { 151 ops.pop(); 152 ::InterlockedDecrement(&outstanding_work_); 153 op->destroy(); 154 } 155 } 156 else 157 { 158 DWORD bytes_transferred = 0; 159 dword_ptr_t completion_key = 0; 160 LPOVERLAPPED overlapped = 0; 161 ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, 162 &completion_key, &overlapped, gqcs_timeout_); 163 if (overlapped) 164 { 165 ::InterlockedDecrement(&outstanding_work_); 166 static_cast<win_iocp_operation*>(overlapped)->destroy(); 167 } 168 } 169 } 170 171 if (timer_thread_.get()) 172 timer_thread_->join(); 173} 174 175boost::system::error_code win_iocp_io_context::register_handle( 176 HANDLE handle, boost::system::error_code& ec) 177{ 178 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0) 179 { 180 DWORD last_error = ::GetLastError(); 181 ec = boost::system::error_code(last_error, 182 boost::asio::error::get_system_category()); 183 } 184 else 185 { 186 ec = boost::system::error_code(); 187 } 188 return ec; 189} 190 191size_t win_iocp_io_context::run(boost::system::error_code& ec) 192{ 193 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) 194 { 195 stop(); 196 ec = boost::system::error_code(); 197 return 0; 198 } 199 200 win_iocp_thread_info this_thread; 201 thread_call_stack::context ctx(this, this_thread); 202 203 size_t n = 0; 204 while (do_one(INFINITE, this_thread, ec)) 205 if (n != (std::numeric_limits<size_t>::max)()) 206 ++n; 207 return n; 208} 209 210size_t win_iocp_io_context::run_one(boost::system::error_code& ec) 211{ 212 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) 213 { 214 stop(); 215 ec = boost::system::error_code(); 216 return 0; 217 } 218 219 win_iocp_thread_info this_thread; 220 thread_call_stack::context ctx(this, this_thread); 221 222 return do_one(INFINITE, this_thread, ec); 223} 224 225size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec) 226{ 227 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) 228 { 229 stop(); 230 ec = boost::system::error_code(); 231 return 0; 232 } 233 234 win_iocp_thread_info this_thread; 235 thread_call_stack::context ctx(this, this_thread); 236 237 return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec); 238} 239 240size_t win_iocp_io_context::poll(boost::system::error_code& ec) 241{ 242 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) 243 { 244 stop(); 245 ec = boost::system::error_code(); 246 return 0; 247 } 248 249 win_iocp_thread_info this_thread; 250 thread_call_stack::context ctx(this, this_thread); 251 252 size_t n = 0; 253 while (do_one(0, this_thread, ec)) 254 if (n != (std::numeric_limits<size_t>::max)()) 255 ++n; 256 return n; 257} 258 259size_t win_iocp_io_context::poll_one(boost::system::error_code& ec) 260{ 261 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) 262 { 263 stop(); 264 ec = boost::system::error_code(); 265 return 0; 266 } 267 268 win_iocp_thread_info this_thread; 269 thread_call_stack::context ctx(this, this_thread); 270 271 return do_one(0, this_thread, ec); 272} 273 274void win_iocp_io_context::stop() 275{ 276 if (::InterlockedExchange(&stopped_, 1) == 0) 277 { 278 if (::InterlockedExchange(&stop_event_posted_, 1) == 0) 279 { 280 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0)) 281 { 282 DWORD last_error = ::GetLastError(); 283 boost::system::error_code ec(last_error, 284 boost::asio::error::get_system_category()); 285 boost::asio::detail::throw_error(ec, "pqcs"); 286 } 287 } 288 } 289} 290 291void win_iocp_io_context::capture_current_exception() 292{ 293 if (thread_info_base* this_thread = thread_call_stack::contains(this)) 294 this_thread->capture_current_exception(); 295} 296 297void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op) 298{ 299 // Flag the operation as ready. 300 op->ready_ = 1; 301 302 // Enqueue the operation on the I/O completion port. 303 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op)) 304 { 305 // Out of resources. Put on completed queue instead. 306 mutex::scoped_lock lock(dispatch_mutex_); 307 completed_ops_.push(op); 308 ::InterlockedExchange(&dispatch_required_, 1); 309 } 310} 311 312void win_iocp_io_context::post_deferred_completions( 313 op_queue<win_iocp_operation>& ops) 314{ 315 while (win_iocp_operation* op = ops.front()) 316 { 317 ops.pop(); 318 319 // Flag the operation as ready. 320 op->ready_ = 1; 321 322 // Enqueue the operation on the I/O completion port. 323 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op)) 324 { 325 // Out of resources. Put on completed queue instead. 326 mutex::scoped_lock lock(dispatch_mutex_); 327 completed_ops_.push(op); 328 completed_ops_.push(ops); 329 ::InterlockedExchange(&dispatch_required_, 1); 330 } 331 } 332} 333 334void win_iocp_io_context::abandon_operations( 335 op_queue<win_iocp_operation>& ops) 336{ 337 while (win_iocp_operation* op = ops.front()) 338 { 339 ops.pop(); 340 ::InterlockedDecrement(&outstanding_work_); 341 op->destroy(); 342 } 343} 344 345void win_iocp_io_context::on_pending(win_iocp_operation* op) 346{ 347 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) 348 { 349 // Enqueue the operation on the I/O completion port. 350 if (!::PostQueuedCompletionStatus(iocp_.handle, 351 0, overlapped_contains_result, op)) 352 { 353 // Out of resources. Put on completed queue instead. 354 mutex::scoped_lock lock(dispatch_mutex_); 355 completed_ops_.push(op); 356 ::InterlockedExchange(&dispatch_required_, 1); 357 } 358 } 359} 360 361void win_iocp_io_context::on_completion(win_iocp_operation* op, 362 DWORD last_error, DWORD bytes_transferred) 363{ 364 // Flag that the operation is ready for invocation. 365 op->ready_ = 1; 366 367 // Store results in the OVERLAPPED structure. 368 op->Internal = reinterpret_cast<ulong_ptr_t>( 369 &boost::asio::error::get_system_category()); 370 op->Offset = last_error; 371 op->OffsetHigh = bytes_transferred; 372 373 // Enqueue the operation on the I/O completion port. 374 if (!::PostQueuedCompletionStatus(iocp_.handle, 375 0, overlapped_contains_result, op)) 376 { 377 // Out of resources. Put on completed queue instead. 378 mutex::scoped_lock lock(dispatch_mutex_); 379 completed_ops_.push(op); 380 ::InterlockedExchange(&dispatch_required_, 1); 381 } 382} 383 384void win_iocp_io_context::on_completion(win_iocp_operation* op, 385 const boost::system::error_code& ec, DWORD bytes_transferred) 386{ 387 // Flag that the operation is ready for invocation. 388 op->ready_ = 1; 389 390 // Store results in the OVERLAPPED structure. 391 op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category()); 392 op->Offset = ec.value(); 393 op->OffsetHigh = bytes_transferred; 394 395 // Enqueue the operation on the I/O completion port. 396 if (!::PostQueuedCompletionStatus(iocp_.handle, 397 0, overlapped_contains_result, op)) 398 { 399 // Out of resources. Put on completed queue instead. 400 mutex::scoped_lock lock(dispatch_mutex_); 401 completed_ops_.push(op); 402 ::InterlockedExchange(&dispatch_required_, 1); 403 } 404} 405 406size_t win_iocp_io_context::do_one(DWORD msec, 407 win_iocp_thread_info& this_thread, boost::system::error_code& ec) 408{ 409 for (;;) 410 { 411 // Try to acquire responsibility for dispatching timers and completed ops. 412 if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1) 413 { 414 mutex::scoped_lock lock(dispatch_mutex_); 415 416 // Dispatch pending timers and operations. 417 op_queue<win_iocp_operation> ops; 418 ops.push(completed_ops_); 419 timer_queues_.get_ready_timers(ops); 420 post_deferred_completions(ops); 421 update_timeout(); 422 } 423 424 // Get the next operation from the queue. 425 DWORD bytes_transferred = 0; 426 dword_ptr_t completion_key = 0; 427 LPOVERLAPPED overlapped = 0; 428 ::SetLastError(0); 429 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, 430 &bytes_transferred, &completion_key, &overlapped, 431 msec < gqcs_timeout_ ? msec : gqcs_timeout_); 432 DWORD last_error = ::GetLastError(); 433 434 if (overlapped) 435 { 436 win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped); 437 boost::system::error_code result_ec(last_error, 438 boost::asio::error::get_system_category()); 439 440 // We may have been passed the last_error and bytes_transferred in the 441 // OVERLAPPED structure itself. 442 if (completion_key == overlapped_contains_result) 443 { 444 result_ec = boost::system::error_code(static_cast<int>(op->Offset), 445 *reinterpret_cast<boost::system::error_category*>(op->Internal)); 446 bytes_transferred = op->OffsetHigh; 447 } 448 449 // Otherwise ensure any result has been saved into the OVERLAPPED 450 // structure. 451 else 452 { 453 op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category()); 454 op->Offset = result_ec.value(); 455 op->OffsetHigh = bytes_transferred; 456 } 457 458 // Dispatch the operation only if ready. The operation may not be ready 459 // if the initiating function (e.g. a call to WSARecv) has not yet 460 // returned. This is because the initiating function still wants access 461 // to the operation's OVERLAPPED structure. 462 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) 463 { 464 // Ensure the count of outstanding work is decremented on block exit. 465 work_finished_on_block_exit on_exit = { this }; 466 (void)on_exit; 467 468 op->complete(this, result_ec, bytes_transferred); 469 this_thread.rethrow_pending_exception(); 470 ec = boost::system::error_code(); 471 return 1; 472 } 473 } 474 else if (!ok) 475 { 476 if (last_error != WAIT_TIMEOUT) 477 { 478 ec = boost::system::error_code(last_error, 479 boost::asio::error::get_system_category()); 480 return 0; 481 } 482 483 // If we're waiting indefinitely we need to keep going until we get a 484 // real handler. 485 if (msec == INFINITE) 486 continue; 487 488 ec = boost::system::error_code(); 489 return 0; 490 } 491 else if (completion_key == wake_for_dispatch) 492 { 493 // We have been woken up to try to acquire responsibility for dispatching 494 // timers and completed operations. 495 } 496 else 497 { 498 // Indicate that there is no longer an in-flight stop event. 499 ::InterlockedExchange(&stop_event_posted_, 0); 500 501 // The stopped_ flag is always checked to ensure that any leftover 502 // stop events from a previous run invocation are ignored. 503 if (::InterlockedExchangeAdd(&stopped_, 0) != 0) 504 { 505 // Wake up next thread that is blocked on GetQueuedCompletionStatus. 506 if (::InterlockedExchange(&stop_event_posted_, 1) == 0) 507 { 508 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0)) 509 { 510 last_error = ::GetLastError(); 511 ec = boost::system::error_code(last_error, 512 boost::asio::error::get_system_category()); 513 return 0; 514 } 515 } 516 517 ec = boost::system::error_code(); 518 return 0; 519 } 520 } 521 } 522} 523 524DWORD win_iocp_io_context::get_gqcs_timeout() 525{ 526 OSVERSIONINFOEX osvi; 527 ZeroMemory(&osvi, sizeof(osvi)); 528 osvi.dwOSVersionInfoSize = sizeof(osvi); 529 osvi.dwMajorVersion = 6ul; 530 531 const uint64_t condition_mask = ::VerSetConditionMask( 532 0, VER_MAJORVERSION, VER_GREATER_EQUAL); 533 534 if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask)) 535 return INFINITE; 536 537 return default_gqcs_timeout; 538} 539 540void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue) 541{ 542 mutex::scoped_lock lock(dispatch_mutex_); 543 544 timer_queues_.insert(&queue); 545 546 if (!waitable_timer_.handle) 547 { 548 waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0); 549 if (waitable_timer_.handle == 0) 550 { 551 DWORD last_error = ::GetLastError(); 552 boost::system::error_code ec(last_error, 553 boost::asio::error::get_system_category()); 554 boost::asio::detail::throw_error(ec, "timer"); 555 } 556 557 LARGE_INTEGER timeout; 558 timeout.QuadPart = -max_timeout_usec; 559 timeout.QuadPart *= 10; 560 ::SetWaitableTimer(waitable_timer_.handle, 561 &timeout, max_timeout_msec, 0, 0, FALSE); 562 } 563 564 if (!timer_thread_.get()) 565 { 566 timer_thread_function thread_function = { this }; 567 timer_thread_.reset(new thread(thread_function, 65536)); 568 } 569} 570 571void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue) 572{ 573 mutex::scoped_lock lock(dispatch_mutex_); 574 575 timer_queues_.erase(&queue); 576} 577 578void win_iocp_io_context::update_timeout() 579{ 580 if (timer_thread_.get()) 581 { 582 // There's no point updating the waitable timer if the new timeout period 583 // exceeds the maximum timeout. In that case, we might as well wait for the 584 // existing period of the timer to expire. 585 long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec); 586 if (timeout_usec < max_timeout_usec) 587 { 588 LARGE_INTEGER timeout; 589 timeout.QuadPart = -timeout_usec; 590 timeout.QuadPart *= 10; 591 ::SetWaitableTimer(waitable_timer_.handle, 592 &timeout, max_timeout_msec, 0, 0, FALSE); 593 } 594 } 595} 596 597} // namespace detail 598} // namespace asio 599} // namespace boost 600 601#include <boost/asio/detail/pop_options.hpp> 602 603#endif // defined(BOOST_ASIO_HAS_IOCP) 604 605#endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP 606