1//
2// detail/impl/win_iocp_io_context.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
12#define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19
20#if defined(BOOST_ASIO_HAS_IOCP)
21
22#include <boost/asio/error.hpp>
23#include <boost/asio/detail/cstdint.hpp>
24#include <boost/asio/detail/handler_alloc_helpers.hpp>
25#include <boost/asio/detail/handler_invoke_helpers.hpp>
26#include <boost/asio/detail/limits.hpp>
27#include <boost/asio/detail/thread.hpp>
28#include <boost/asio/detail/throw_error.hpp>
29#include <boost/asio/detail/win_iocp_io_context.hpp>
30
31#include <boost/asio/detail/push_options.hpp>
32
33namespace boost {
34namespace asio {
35namespace detail {
36
37struct win_iocp_io_context::thread_function
38{
39  explicit thread_function(win_iocp_io_context* s)
40    : this_(s)
41  {
42  }
43
44  void operator()()
45  {
46    boost::system::error_code ec;
47    this_->run(ec);
48  }
49
50  win_iocp_io_context* this_;
51};
52
53struct win_iocp_io_context::work_finished_on_block_exit
54{
55  ~work_finished_on_block_exit()
56  {
57    io_context_->work_finished();
58  }
59
60  win_iocp_io_context* io_context_;
61};
62
63struct win_iocp_io_context::timer_thread_function
64{
65  void operator()()
66  {
67    while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
68    {
69      if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
70            INFINITE) == WAIT_OBJECT_0)
71      {
72        ::InterlockedExchange(&io_context_->dispatch_required_, 1);
73        ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
74            0, wake_for_dispatch, 0);
75      }
76    }
77  }
78
79  win_iocp_io_context* io_context_;
80};
81
82win_iocp_io_context::win_iocp_io_context(
83    boost::asio::execution_context& ctx, int concurrency_hint, bool own_thread)
84  : execution_context_service_base<win_iocp_io_context>(ctx),
85    iocp_(),
86    outstanding_work_(0),
87    stopped_(0),
88    stop_event_posted_(0),
89    shutdown_(0),
90    gqcs_timeout_(get_gqcs_timeout()),
91    dispatch_required_(0),
92    concurrency_hint_(concurrency_hint)
93{
94  BOOST_ASIO_HANDLER_TRACKING_INIT;
95
96  iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
97      static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
98  if (!iocp_.handle)
99  {
100    DWORD last_error = ::GetLastError();
101    boost::system::error_code ec(last_error,
102        boost::asio::error::get_system_category());
103    boost::asio::detail::throw_error(ec, "iocp");
104  }
105
106  if (own_thread)
107  {
108    ::InterlockedIncrement(&outstanding_work_);
109    thread_.reset(new boost::asio::detail::thread(thread_function(this)));
110  }
111}
112
113win_iocp_io_context::~win_iocp_io_context()
114{
115  if (thread_.get())
116  {
117    stop();
118    thread_->join();
119    thread_.reset();
120  }
121}
122
123void win_iocp_io_context::shutdown()
124{
125  ::InterlockedExchange(&shutdown_, 1);
126
127  if (timer_thread_.get())
128  {
129    LARGE_INTEGER timeout;
130    timeout.QuadPart = 1;
131    ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
132  }
133
134  if (thread_.get())
135  {
136    stop();
137    thread_->join();
138    thread_.reset();
139    ::InterlockedDecrement(&outstanding_work_);
140  }
141
142  while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
143  {
144    op_queue<win_iocp_operation> ops;
145    timer_queues_.get_all_timers(ops);
146    ops.push(completed_ops_);
147    if (!ops.empty())
148    {
149      while (win_iocp_operation* op = ops.front())
150      {
151        ops.pop();
152        ::InterlockedDecrement(&outstanding_work_);
153        op->destroy();
154      }
155    }
156    else
157    {
158      DWORD bytes_transferred = 0;
159      dword_ptr_t completion_key = 0;
160      LPOVERLAPPED overlapped = 0;
161      ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
162          &completion_key, &overlapped, gqcs_timeout_);
163      if (overlapped)
164      {
165        ::InterlockedDecrement(&outstanding_work_);
166        static_cast<win_iocp_operation*>(overlapped)->destroy();
167      }
168    }
169  }
170
171  if (timer_thread_.get())
172    timer_thread_->join();
173}
174
175boost::system::error_code win_iocp_io_context::register_handle(
176    HANDLE handle, boost::system::error_code& ec)
177{
178  if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
179  {
180    DWORD last_error = ::GetLastError();
181    ec = boost::system::error_code(last_error,
182        boost::asio::error::get_system_category());
183  }
184  else
185  {
186    ec = boost::system::error_code();
187  }
188  return ec;
189}
190
191size_t win_iocp_io_context::run(boost::system::error_code& ec)
192{
193  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
194  {
195    stop();
196    ec = boost::system::error_code();
197    return 0;
198  }
199
200  win_iocp_thread_info this_thread;
201  thread_call_stack::context ctx(this, this_thread);
202
203  size_t n = 0;
204  while (do_one(INFINITE, this_thread, ec))
205    if (n != (std::numeric_limits<size_t>::max)())
206      ++n;
207  return n;
208}
209
210size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
211{
212  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
213  {
214    stop();
215    ec = boost::system::error_code();
216    return 0;
217  }
218
219  win_iocp_thread_info this_thread;
220  thread_call_stack::context ctx(this, this_thread);
221
222  return do_one(INFINITE, this_thread, ec);
223}
224
225size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
226{
227  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
228  {
229    stop();
230    ec = boost::system::error_code();
231    return 0;
232  }
233
234  win_iocp_thread_info this_thread;
235  thread_call_stack::context ctx(this, this_thread);
236
237  return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec);
238}
239
240size_t win_iocp_io_context::poll(boost::system::error_code& ec)
241{
242  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
243  {
244    stop();
245    ec = boost::system::error_code();
246    return 0;
247  }
248
249  win_iocp_thread_info this_thread;
250  thread_call_stack::context ctx(this, this_thread);
251
252  size_t n = 0;
253  while (do_one(0, this_thread, ec))
254    if (n != (std::numeric_limits<size_t>::max)())
255      ++n;
256  return n;
257}
258
259size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
260{
261  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
262  {
263    stop();
264    ec = boost::system::error_code();
265    return 0;
266  }
267
268  win_iocp_thread_info this_thread;
269  thread_call_stack::context ctx(this, this_thread);
270
271  return do_one(0, this_thread, ec);
272}
273
274void win_iocp_io_context::stop()
275{
276  if (::InterlockedExchange(&stopped_, 1) == 0)
277  {
278    if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
279    {
280      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
281      {
282        DWORD last_error = ::GetLastError();
283        boost::system::error_code ec(last_error,
284            boost::asio::error::get_system_category());
285        boost::asio::detail::throw_error(ec, "pqcs");
286      }
287    }
288  }
289}
290
291void win_iocp_io_context::capture_current_exception()
292{
293  if (thread_info_base* this_thread = thread_call_stack::contains(this))
294    this_thread->capture_current_exception();
295}
296
297void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
298{
299  // Flag the operation as ready.
300  op->ready_ = 1;
301
302  // Enqueue the operation on the I/O completion port.
303  if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
304  {
305    // Out of resources. Put on completed queue instead.
306    mutex::scoped_lock lock(dispatch_mutex_);
307    completed_ops_.push(op);
308    ::InterlockedExchange(&dispatch_required_, 1);
309  }
310}
311
312void win_iocp_io_context::post_deferred_completions(
313    op_queue<win_iocp_operation>& ops)
314{
315  while (win_iocp_operation* op = ops.front())
316  {
317    ops.pop();
318
319    // Flag the operation as ready.
320    op->ready_ = 1;
321
322    // Enqueue the operation on the I/O completion port.
323    if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
324    {
325      // Out of resources. Put on completed queue instead.
326      mutex::scoped_lock lock(dispatch_mutex_);
327      completed_ops_.push(op);
328      completed_ops_.push(ops);
329      ::InterlockedExchange(&dispatch_required_, 1);
330    }
331  }
332}
333
334void win_iocp_io_context::abandon_operations(
335    op_queue<win_iocp_operation>& ops)
336{
337  while (win_iocp_operation* op = ops.front())
338  {
339    ops.pop();
340    ::InterlockedDecrement(&outstanding_work_);
341    op->destroy();
342  }
343}
344
345void win_iocp_io_context::on_pending(win_iocp_operation* op)
346{
347  if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
348  {
349    // Enqueue the operation on the I/O completion port.
350    if (!::PostQueuedCompletionStatus(iocp_.handle,
351          0, overlapped_contains_result, op))
352    {
353      // Out of resources. Put on completed queue instead.
354      mutex::scoped_lock lock(dispatch_mutex_);
355      completed_ops_.push(op);
356      ::InterlockedExchange(&dispatch_required_, 1);
357    }
358  }
359}
360
361void win_iocp_io_context::on_completion(win_iocp_operation* op,
362    DWORD last_error, DWORD bytes_transferred)
363{
364  // Flag that the operation is ready for invocation.
365  op->ready_ = 1;
366
367  // Store results in the OVERLAPPED structure.
368  op->Internal = reinterpret_cast<ulong_ptr_t>(
369      &boost::asio::error::get_system_category());
370  op->Offset = last_error;
371  op->OffsetHigh = bytes_transferred;
372
373  // Enqueue the operation on the I/O completion port.
374  if (!::PostQueuedCompletionStatus(iocp_.handle,
375        0, overlapped_contains_result, op))
376  {
377    // Out of resources. Put on completed queue instead.
378    mutex::scoped_lock lock(dispatch_mutex_);
379    completed_ops_.push(op);
380    ::InterlockedExchange(&dispatch_required_, 1);
381  }
382}
383
384void win_iocp_io_context::on_completion(win_iocp_operation* op,
385    const boost::system::error_code& ec, DWORD bytes_transferred)
386{
387  // Flag that the operation is ready for invocation.
388  op->ready_ = 1;
389
390  // Store results in the OVERLAPPED structure.
391  op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
392  op->Offset = ec.value();
393  op->OffsetHigh = bytes_transferred;
394
395  // Enqueue the operation on the I/O completion port.
396  if (!::PostQueuedCompletionStatus(iocp_.handle,
397        0, overlapped_contains_result, op))
398  {
399    // Out of resources. Put on completed queue instead.
400    mutex::scoped_lock lock(dispatch_mutex_);
401    completed_ops_.push(op);
402    ::InterlockedExchange(&dispatch_required_, 1);
403  }
404}
405
406size_t win_iocp_io_context::do_one(DWORD msec,
407    win_iocp_thread_info& this_thread, boost::system::error_code& ec)
408{
409  for (;;)
410  {
411    // Try to acquire responsibility for dispatching timers and completed ops.
412    if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
413    {
414      mutex::scoped_lock lock(dispatch_mutex_);
415
416      // Dispatch pending timers and operations.
417      op_queue<win_iocp_operation> ops;
418      ops.push(completed_ops_);
419      timer_queues_.get_ready_timers(ops);
420      post_deferred_completions(ops);
421      update_timeout();
422    }
423
424    // Get the next operation from the queue.
425    DWORD bytes_transferred = 0;
426    dword_ptr_t completion_key = 0;
427    LPOVERLAPPED overlapped = 0;
428    ::SetLastError(0);
429    BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
430        &bytes_transferred, &completion_key, &overlapped,
431        msec < gqcs_timeout_ ? msec : gqcs_timeout_);
432    DWORD last_error = ::GetLastError();
433
434    if (overlapped)
435    {
436      win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
437      boost::system::error_code result_ec(last_error,
438          boost::asio::error::get_system_category());
439
440      // We may have been passed the last_error and bytes_transferred in the
441      // OVERLAPPED structure itself.
442      if (completion_key == overlapped_contains_result)
443      {
444        result_ec = boost::system::error_code(static_cast<int>(op->Offset),
445            *reinterpret_cast<boost::system::error_category*>(op->Internal));
446        bytes_transferred = op->OffsetHigh;
447      }
448
449      // Otherwise ensure any result has been saved into the OVERLAPPED
450      // structure.
451      else
452      {
453        op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
454        op->Offset = result_ec.value();
455        op->OffsetHigh = bytes_transferred;
456      }
457
458      // Dispatch the operation only if ready. The operation may not be ready
459      // if the initiating function (e.g. a call to WSARecv) has not yet
460      // returned. This is because the initiating function still wants access
461      // to the operation's OVERLAPPED structure.
462      if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
463      {
464        // Ensure the count of outstanding work is decremented on block exit.
465        work_finished_on_block_exit on_exit = { this };
466        (void)on_exit;
467
468        op->complete(this, result_ec, bytes_transferred);
469        this_thread.rethrow_pending_exception();
470        ec = boost::system::error_code();
471        return 1;
472      }
473    }
474    else if (!ok)
475    {
476      if (last_error != WAIT_TIMEOUT)
477      {
478        ec = boost::system::error_code(last_error,
479            boost::asio::error::get_system_category());
480        return 0;
481      }
482
483      // If we're waiting indefinitely we need to keep going until we get a
484      // real handler.
485      if (msec == INFINITE)
486        continue;
487
488      ec = boost::system::error_code();
489      return 0;
490    }
491    else if (completion_key == wake_for_dispatch)
492    {
493      // We have been woken up to try to acquire responsibility for dispatching
494      // timers and completed operations.
495    }
496    else
497    {
498      // Indicate that there is no longer an in-flight stop event.
499      ::InterlockedExchange(&stop_event_posted_, 0);
500
501      // The stopped_ flag is always checked to ensure that any leftover
502      // stop events from a previous run invocation are ignored.
503      if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
504      {
505        // Wake up next thread that is blocked on GetQueuedCompletionStatus.
506        if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
507        {
508          if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
509          {
510            last_error = ::GetLastError();
511            ec = boost::system::error_code(last_error,
512                boost::asio::error::get_system_category());
513            return 0;
514          }
515        }
516
517        ec = boost::system::error_code();
518        return 0;
519      }
520    }
521  }
522}
523
524DWORD win_iocp_io_context::get_gqcs_timeout()
525{
526  OSVERSIONINFOEX osvi;
527  ZeroMemory(&osvi, sizeof(osvi));
528  osvi.dwOSVersionInfoSize = sizeof(osvi);
529  osvi.dwMajorVersion = 6ul;
530
531  const uint64_t condition_mask = ::VerSetConditionMask(
532      0, VER_MAJORVERSION, VER_GREATER_EQUAL);
533
534  if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
535    return INFINITE;
536
537  return default_gqcs_timeout;
538}
539
540void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
541{
542  mutex::scoped_lock lock(dispatch_mutex_);
543
544  timer_queues_.insert(&queue);
545
546  if (!waitable_timer_.handle)
547  {
548    waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
549    if (waitable_timer_.handle == 0)
550    {
551      DWORD last_error = ::GetLastError();
552      boost::system::error_code ec(last_error,
553          boost::asio::error::get_system_category());
554      boost::asio::detail::throw_error(ec, "timer");
555    }
556
557    LARGE_INTEGER timeout;
558    timeout.QuadPart = -max_timeout_usec;
559    timeout.QuadPart *= 10;
560    ::SetWaitableTimer(waitable_timer_.handle,
561        &timeout, max_timeout_msec, 0, 0, FALSE);
562  }
563
564  if (!timer_thread_.get())
565  {
566    timer_thread_function thread_function = { this };
567    timer_thread_.reset(new thread(thread_function, 65536));
568  }
569}
570
571void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
572{
573  mutex::scoped_lock lock(dispatch_mutex_);
574
575  timer_queues_.erase(&queue);
576}
577
578void win_iocp_io_context::update_timeout()
579{
580  if (timer_thread_.get())
581  {
582    // There's no point updating the waitable timer if the new timeout period
583    // exceeds the maximum timeout. In that case, we might as well wait for the
584    // existing period of the timer to expire.
585    long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
586    if (timeout_usec < max_timeout_usec)
587    {
588      LARGE_INTEGER timeout;
589      timeout.QuadPart = -timeout_usec;
590      timeout.QuadPart *= 10;
591      ::SetWaitableTimer(waitable_timer_.handle,
592          &timeout, max_timeout_msec, 0, 0, FALSE);
593    }
594  }
595}
596
597} // namespace detail
598} // namespace asio
599} // namespace boost
600
601#include <boost/asio/detail/pop_options.hpp>
602
603#endif // defined(BOOST_ASIO_HAS_IOCP)
604
605#endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
606