1//
2// detail/impl/kqueue_reactor.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6// Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
7//
8// Distributed under the Boost Software License, Version 1.0. (See accompanying
9// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10//
11
12#ifndef ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
13#define ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
14
15#if defined(_MSC_VER) && (_MSC_VER >= 1200)
16# pragma once
17#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18
19#include "asio/detail/config.hpp"
20
21#if defined(ASIO_HAS_KQUEUE)
22
23#include "asio/detail/kqueue_reactor.hpp"
24#include "asio/detail/throw_error.hpp"
25#include "asio/error.hpp"
26
27#include "asio/detail/push_options.hpp"
28
29#if defined(__NetBSD__)
30# define ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
31    EV_SET(ev, ident, filt, flags, fflags, data, \
32      reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
33#else
34# define ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
35    EV_SET(ev, ident, filt, flags, fflags, data, udata)
36#endif
37
38namespace clmdep_asio {
39namespace detail {
40
41kqueue_reactor::kqueue_reactor(clmdep_asio::io_service& io_service)
42  : clmdep_asio::detail::service_base<kqueue_reactor>(io_service),
43    io_service_(use_service<io_service_impl>(io_service)),
44    mutex_(),
45    kqueue_fd_(do_kqueue_create()),
46    interrupter_(),
47    shutdown_(false)
48{
49  struct kevent events[1];
50  ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
51      EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
52  if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
53  {
54    clmdep_asio::error_code error(errno,
55        clmdep_asio::error::get_system_category());
56    clmdep_asio::detail::throw_error(error);
57  }
58}
59
60kqueue_reactor::~kqueue_reactor()
61{
62  close(kqueue_fd_);
63}
64
65void kqueue_reactor::shutdown_service()
66{
67  mutex::scoped_lock lock(mutex_);
68  shutdown_ = true;
69  lock.unlock();
70
71  op_queue<operation> ops;
72
73  while (descriptor_state* state = registered_descriptors_.first())
74  {
75    for (int i = 0; i < max_ops; ++i)
76      ops.push(state->op_queue_[i]);
77    state->shutdown_ = true;
78    registered_descriptors_.free(state);
79  }
80
81  timer_queues_.get_all_timers(ops);
82
83  io_service_.abandon_operations(ops);
84}
85
86void kqueue_reactor::fork_service(clmdep_asio::io_service::fork_event fork_ev)
87{
88  if (fork_ev == clmdep_asio::io_service::fork_child)
89  {
90    // The kqueue descriptor is automatically closed in the child.
91    kqueue_fd_ = -1;
92    kqueue_fd_ = do_kqueue_create();
93
94    interrupter_.recreate();
95
96    struct kevent events[2];
97    ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
98        EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
99    if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
100    {
101      clmdep_asio::error_code ec(errno,
102          clmdep_asio::error::get_system_category());
103      clmdep_asio::detail::throw_error(ec, "kqueue interrupter registration");
104    }
105
106    // Re-register all descriptors with kqueue.
107    mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
108    for (descriptor_state* state = registered_descriptors_.first();
109        state != 0; state = state->next_)
110    {
111      if (state->num_kevents_ > 0)
112      {
113        ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
114            EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
115        ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
116            EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
117        if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
118        {
119          clmdep_asio::error_code ec(errno,
120              clmdep_asio::error::get_system_category());
121          clmdep_asio::detail::throw_error(ec, "kqueue re-registration");
122        }
123      }
124    }
125  }
126}
127
128void kqueue_reactor::init_task()
129{
130  io_service_.init_task();
131}
132
133int kqueue_reactor::register_descriptor(socket_type descriptor,
134    kqueue_reactor::per_descriptor_data& descriptor_data)
135{
136  descriptor_data = allocate_descriptor_state();
137
138  mutex::scoped_lock lock(descriptor_data->mutex_);
139
140  descriptor_data->descriptor_ = descriptor;
141  descriptor_data->num_kevents_ = 0;
142  descriptor_data->shutdown_ = false;
143
144  return 0;
145}
146
147int kqueue_reactor::register_internal_descriptor(
148    int op_type, socket_type descriptor,
149    kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
150{
151  descriptor_data = allocate_descriptor_state();
152
153  mutex::scoped_lock lock(descriptor_data->mutex_);
154
155  descriptor_data->descriptor_ = descriptor;
156  descriptor_data->num_kevents_ = 1;
157  descriptor_data->shutdown_ = false;
158  descriptor_data->op_queue_[op_type].push(op);
159
160  struct kevent events[1];
161  ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
162      EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
163  if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
164    return errno;
165
166  return 0;
167}
168
169void kqueue_reactor::move_descriptor(socket_type,
170    kqueue_reactor::per_descriptor_data& target_descriptor_data,
171    kqueue_reactor::per_descriptor_data& source_descriptor_data)
172{
173  target_descriptor_data = source_descriptor_data;
174  source_descriptor_data = 0;
175}
176
177void kqueue_reactor::start_op(int op_type, socket_type descriptor,
178    kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
179    bool is_continuation, bool allow_speculative)
180{
181  if (!descriptor_data)
182  {
183    op->ec_ = clmdep_asio::error::bad_descriptor;
184    post_immediate_completion(op, is_continuation);
185    return;
186  }
187
188  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
189
190  if (descriptor_data->shutdown_)
191  {
192    post_immediate_completion(op, is_continuation);
193    return;
194  }
195
196  if (descriptor_data->op_queue_[op_type].empty())
197  {
198    static const int num_kevents[max_ops] = { 1, 2, 1 };
199
200    if (allow_speculative
201        && (op_type != read_op
202          || descriptor_data->op_queue_[except_op].empty()))
203    {
204      if (op->perform())
205      {
206        descriptor_lock.unlock();
207        io_service_.post_immediate_completion(op, is_continuation);
208        return;
209      }
210
211      if (descriptor_data->num_kevents_ < num_kevents[op_type])
212      {
213        struct kevent events[2];
214        ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
215            EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
216        ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
217            EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
218        if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
219        {
220          descriptor_data->num_kevents_ = num_kevents[op_type];
221        }
222        else
223        {
224          op->ec_ = clmdep_asio::error_code(errno,
225              clmdep_asio::error::get_system_category());
226          io_service_.post_immediate_completion(op, is_continuation);
227          return;
228        }
229      }
230    }
231    else
232    {
233      if (descriptor_data->num_kevents_ < num_kevents[op_type])
234        descriptor_data->num_kevents_ = num_kevents[op_type];
235
236      struct kevent events[2];
237      ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
238          EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
239      ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
240          EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
241      ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
242    }
243  }
244
245  descriptor_data->op_queue_[op_type].push(op);
246  io_service_.work_started();
247}
248
249void kqueue_reactor::cancel_ops(socket_type,
250    kqueue_reactor::per_descriptor_data& descriptor_data)
251{
252  if (!descriptor_data)
253    return;
254
255  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
256
257  op_queue<operation> ops;
258  for (int i = 0; i < max_ops; ++i)
259  {
260    while (reactor_op* op = descriptor_data->op_queue_[i].front())
261    {
262      op->ec_ = clmdep_asio::error::operation_aborted;
263      descriptor_data->op_queue_[i].pop();
264      ops.push(op);
265    }
266  }
267
268  descriptor_lock.unlock();
269
270  io_service_.post_deferred_completions(ops);
271}
272
273void kqueue_reactor::deregister_descriptor(socket_type descriptor,
274    kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
275{
276  if (!descriptor_data)
277    return;
278
279  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
280
281  if (!descriptor_data->shutdown_)
282  {
283    if (closing)
284    {
285      // The descriptor will be automatically removed from the kqueue when it
286      // is closed.
287    }
288    else
289    {
290      struct kevent events[2];
291      ASIO_KQUEUE_EV_SET(&events[0], descriptor,
292          EVFILT_READ, EV_DELETE, 0, 0, 0);
293      ASIO_KQUEUE_EV_SET(&events[1], descriptor,
294          EVFILT_WRITE, EV_DELETE, 0, 0, 0);
295      ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
296    }
297
298    op_queue<operation> ops;
299    for (int i = 0; i < max_ops; ++i)
300    {
301      while (reactor_op* op = descriptor_data->op_queue_[i].front())
302      {
303        op->ec_ = clmdep_asio::error::operation_aborted;
304        descriptor_data->op_queue_[i].pop();
305        ops.push(op);
306      }
307    }
308
309    descriptor_data->descriptor_ = -1;
310    descriptor_data->shutdown_ = true;
311
312    descriptor_lock.unlock();
313
314    free_descriptor_state(descriptor_data);
315    descriptor_data = 0;
316
317    io_service_.post_deferred_completions(ops);
318  }
319}
320
321void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
322    kqueue_reactor::per_descriptor_data& descriptor_data)
323{
324  if (!descriptor_data)
325    return;
326
327  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
328
329  if (!descriptor_data->shutdown_)
330  {
331    struct kevent events[2];
332    ASIO_KQUEUE_EV_SET(&events[0], descriptor,
333        EVFILT_READ, EV_DELETE, 0, 0, 0);
334    ASIO_KQUEUE_EV_SET(&events[1], descriptor,
335        EVFILT_WRITE, EV_DELETE, 0, 0, 0);
336    ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
337
338    op_queue<operation> ops;
339    for (int i = 0; i < max_ops; ++i)
340      ops.push(descriptor_data->op_queue_[i]);
341
342    descriptor_data->descriptor_ = -1;
343    descriptor_data->shutdown_ = true;
344
345    descriptor_lock.unlock();
346
347    free_descriptor_state(descriptor_data);
348    descriptor_data = 0;
349  }
350}
351
352void kqueue_reactor::run(bool block, op_queue<operation>& ops)
353{
354  mutex::scoped_lock lock(mutex_);
355
356  // Determine how long to block while waiting for events.
357  timespec timeout_buf = { 0, 0 };
358  timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
359
360  lock.unlock();
361
362  // Block on the kqueue descriptor.
363  struct kevent events[128];
364  int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
365
366  // Dispatch the waiting events.
367  for (int i = 0; i < num_events; ++i)
368  {
369    void* ptr = reinterpret_cast<void*>(events[i].udata);
370    if (ptr == &interrupter_)
371    {
372      interrupter_.reset();
373    }
374    else
375    {
376      descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
377      mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
378
379      if (events[i].filter == EVFILT_WRITE
380          && descriptor_data->num_kevents_ == 2
381          && descriptor_data->op_queue_[write_op].empty())
382      {
383        // Some descriptor types, like serial ports, don't seem to support
384        // EV_CLEAR with EVFILT_WRITE. Since we have no pending write
385        // operations we'll remove the EVFILT_WRITE registration here so that
386        // we don't end up in a tight spin.
387        struct kevent delete_events[1];
388        ASIO_KQUEUE_EV_SET(&delete_events[0],
389            descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
390        ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
391        descriptor_data->num_kevents_ = 1;
392      }
393
394      // Exception operations must be processed first to ensure that any
395      // out-of-band data is read before normal data.
396#if defined(__NetBSD__)
397      static const unsigned int filter[max_ops] =
398#else
399      static const int filter[max_ops] =
400#endif
401        { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
402      for (int j = max_ops - 1; j >= 0; --j)
403      {
404        if (events[i].filter == filter[j])
405        {
406          if (j != except_op || events[i].flags & EV_OOBAND)
407          {
408            while (reactor_op* op = descriptor_data->op_queue_[j].front())
409            {
410              if (events[i].flags & EV_ERROR)
411              {
412                op->ec_ = clmdep_asio::error_code(
413                    static_cast<int>(events[i].data),
414                    clmdep_asio::error::get_system_category());
415                descriptor_data->op_queue_[j].pop();
416                ops.push(op);
417              }
418              if (op->perform())
419              {
420                descriptor_data->op_queue_[j].pop();
421                ops.push(op);
422              }
423              else
424                break;
425            }
426          }
427        }
428      }
429    }
430  }
431
432  lock.lock();
433  timer_queues_.get_ready_timers(ops);
434}
435
436void kqueue_reactor::interrupt()
437{
438  interrupter_.interrupt();
439}
440
441int kqueue_reactor::do_kqueue_create()
442{
443  int fd = ::kqueue();
444  if (fd == -1)
445  {
446    clmdep_asio::error_code ec(errno,
447        clmdep_asio::error::get_system_category());
448    clmdep_asio::detail::throw_error(ec, "kqueue");
449  }
450  return fd;
451}
452
453kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
454{
455  mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
456  return registered_descriptors_.alloc();
457}
458
459void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
460{
461  mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
462  registered_descriptors_.free(s);
463}
464
465void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
466{
467  mutex::scoped_lock lock(mutex_);
468  timer_queues_.insert(&queue);
469}
470
471void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
472{
473  mutex::scoped_lock lock(mutex_);
474  timer_queues_.erase(&queue);
475}
476
477timespec* kqueue_reactor::get_timeout(timespec& ts)
478{
479  // By default we will wait no longer than 5 minutes. This will ensure that
480  // any changes to the system clock are detected after no longer than this.
481  long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
482  ts.tv_sec = usec / 1000000;
483  ts.tv_nsec = (usec % 1000000) * 1000;
484  return &ts;
485}
486
487} // namespace detail
488} // namespace clmdep_asio
489
490#undef ASIO_KQUEUE_EV_SET
491
492#include "asio/detail/pop_options.hpp"
493
494#endif // defined(ASIO_HAS_KQUEUE)
495
496#endif // ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
497